PPT/终版2/毕设.py
2025-05-14 23:06:57 +08:00

1501 lines
64 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import re
import base64
import warnings
import sys # Add sys import for debugging
import ctypes # For short path conversion
from ctypes import wintypes # For short path conversion
# import opencc # <-- opencc 导入会保留,但路径处理提前
# --- Manually add D:\Lib\site-packages to sys.path ---
site_packages_path = r'D:\Lib\site-packages' # Corrected path separator
if site_packages_path not in sys.path:
sys.path.append(site_packages_path)
# --- End of manual addition ---
# --- Manually add path for opencc if needed ---
opencc_site_packages_path = r'C:\Users\86138\AppData\Roaming\Python\Python311\site-packages' # Corrected path separator
if opencc_site_packages_path not in sys.path:
sys.path.insert(0, opencc_site_packages_path) # Insert at the beginning
print(f"--- Debug --- Added to sys.path for opencc: {opencc_site_packages_path}")
# --- End of manual addition for opencc ---
# Now try importing opencc
import opencc
print(f"--- Debug --- Attempting to import imageio in: {__file__}") # Debug print
print(f"--- Debug --- Python executable: {sys.executable}") # Debug print
print(f"--- Debug --- sys.path AFTER manual add: {sys.path}") # Debug print, note the change in message
try: # Debug block
import imageio as test_imageio_module
print(f"--- Debug --- Found 'imageio' at: {test_imageio_module.__file__}")
print(f"--- Debug --- Version of 'imageio': {test_imageio_module.__version__}")
except ImportError as e:
print(f"--- Debug --- ImportError for imageio: {e}")
except AttributeError: # Handle cases where __file__ or __version__ might be missing
print(f"--- Debug --- Found 'imageio', but cannot get __file__ or __version__.")
# The original import line
import imageio
import whisper
import numpy as np
from PIL import Image
from skimage.metrics import structural_similarity as ssim
from collections import defaultdict
import subprocess
from jinja2 import Environment
import cv2
from scipy.signal import find_peaks
from skimage.feature import hog
from skimage.color import rgb2gray
import concurrent.futures
import threading
import queue
import time
import gc
from functools import lru_cache
import multiprocessing
import signal
import traceback
import logging
import json
import shutil
import importlib
# 导入补丁模块 - 用于解决wkhtmltopdf依赖问题
try:
import pdfkit_patch as pdfkit
logging.info("已加载pdfkit补丁模块")
except ImportError:
logging.info("未找到pdfkit补丁模块PDF生成功能可能不可用")
# 设置环境变量,使用 OpenBLAS
os.environ['OPENBLAS_NUM_THREADS'] = '1'
os.environ['MKL_NUM_THREADS'] = '1'
os.environ['NUMEXPR_NUM_THREADS'] = '1'
os.environ['OMP_NUM_THREADS'] = '1'
# FFmpeg路径配置
FFMPEG_BIN = os.path.join(os.path.dirname(os.path.abspath(__file__)), "ffmpeg", "bin")
print(f"--- Debug --- FFMPEG_BIN calculated as: {repr(FFMPEG_BIN)}") # DEBUG LINE ADDED
if not os.path.exists(FFMPEG_BIN):
FFMPEG_BIN = "" # 如果目录不存在使用系统环境变量中的FFmpeg
print(f"--- Debug --- FFMPEG_BIN reset to empty string because path does not exist.") # DEBUG LINE ADDED
# 配置日志
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('process.log', encoding='utf-8'),
logging.StreamHandler()
]
)
# Helper function for short path
def get_short_path_name(long_name):
"""Gets the short path name of a given long path."""
output_buf_size = 0
# First, call GetShortPathNameW with a null buffer to get the required buffer size.
# We expect this to fail and return the size.
output_buf_size = ctypes.windll.kernel32.GetShortPathNameW(long_name, None, 0)
if output_buf_size == 0:
# An error occurred, perhaps the path doesn't exist or another issue.
print(f"--- Debug --- GetShortPathNameW failed to get buffer size for: {long_name}, error code: {ctypes.GetLastError()}")
return long_name # Return original name if conversion fails
output_buf = ctypes.create_unicode_buffer(output_buf_size)
needed = ctypes.windll.kernel32.GetShortPathNameW(long_name, output_buf, output_buf_size)
if needed == 0:
print(f"--- Debug --- GetShortPathNameW failed to convert: {long_name}, error code: {ctypes.GetLastError()}")
return long_name # Return original name if conversion fails
else:
return output_buf.value
def check_dependencies():
try:
# 检查FFmpeg
try:
subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True)
logging.info("FFmpeg 检查通过")
except Exception as e:
logging.error(f"FFmpeg 检查失败: {str(e)}")
return False
# 检查OpenCV
try:
import cv2
logging.info("OpenCV 检查通过")
except Exception as e:
logging.error(f"OpenCV 检查失败: {str(e)}")
return False
# 检查Whisper
try:
import whisper
logging.info("Whisper 检查通过")
except Exception as e:
logging.error(f"Whisper 检查失败: {str(e)}")
return False
# 注意: wkhtmltopdf检查已禁用
# 使用pdfkit_patch模块解决wkhtmltopdf依赖问题
logging.info("wkhtmltopdf检查已禁用仅生成HTML报告")
logging.info("所有依赖项检查通过")
return True
except Exception as e:
logging.error(f"依赖项检查失败: {str(e)}")
return False
# ======================== 全局配置 ========================
warnings.filterwarnings("ignore", message="FP16 is not supported on CPU; using FP32 instead")
# 使用相对路径
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
MODEL_DIR = os.path.join(BASE_DIR, "models")
OUTPUT_DIR = os.path.join(BASE_DIR, "output")
# 创建必要的目录
os.makedirs(MODEL_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)
# 其他配置保持不变
SSIM_THRESHOLD = 0.85 # 关键帧去重阈值
FRAME_INTERVAL = 2 # 抽帧间隔(秒)
TRANSITION_WORDS = ["接下来", "下一页", "如图"] # 过渡词过滤列
HOG_THRESHOLD = 0.7 # HOG特征相似度阈值
COLOR_THRESHOLD = 0.8 # 颜色直方图相似度阈值
WHISPER_MODEL = "small" # Whisper模型大小
PROFESSIONAL_TERMS = {
"人工智能": "AI",
"机器学习": "ML",
"深度学习": "DL",
"神经网络": "NN",
"卷积神经网络": "CNN",
"循环神经网络": "RNN",
"自然语言处理": "NLP",
"计算机视觉": "CV",
"大数据": "Big Data",
"云计算": "Cloud Computing"
} # 专业术语词典
# 性能优化配置
MAX_WORKERS = max(1, multiprocessing.cpu_count() - 1) # 并行处理的工作线程数
BATCH_SIZE = 15 # 增加批处理大小
CACHE_SIZE = 150 # 增加缓存大小
MEMORY_LIMIT = 0.8 # 内存使用限制(占总内存的比例)
TIMEOUT_SECONDS = 200 # 减少超时时间以加速处理流程
PROGRESS_UPDATE_INTERVAL = 1 # 进度更新间隔(秒)
MAX_KEYFRAMES = 30 # 最大关键帧数量限制,超过此数量将进行抽样
# ========================================================
# 进度跟踪类
class ProgressTracker:
def __init__(self, total_steps, description="处理中"):
self.total_steps = total_steps
self.current_step = 0
self.description = description
self.start_time = time.time()
self.last_update_time = self.start_time
self._lock = threading.Lock()
def update(self, step=1, message=None):
with self._lock:
self.current_step += step
current_time = time.time()
# 控制更新频率
if current_time - self.last_update_time >= PROGRESS_UPDATE_INTERVAL:
elapsed = current_time - self.start_time
progress = (self.current_step / self.total_steps) * 100
if message:
print(
f"[进度] {self.description}: {progress:.1f}% ({self.current_step}/{self.total_steps}) - {message}")
else:
print(f"[进度] {self.description}: {progress:.1f}% ({self.current_step}/{self.total_steps})")
self.last_update_time = current_time
def complete(self, message="完成"):
with self._lock:
elapsed = time.time() - self.start_time
print(f"[完成] {self.description}: 100% - {message} (耗时: {elapsed:.1f}秒)")
# 超时处理类
class TimeoutHandler:
def __init__(self, timeout_seconds=TIMEOUT_SECONDS):
self.timeout_seconds = timeout_seconds
self.timer = None
self._lock = threading.Lock()
def start(self, operation_name):
with self._lock:
if self.timer:
self.timer.cancel()
self.timer = threading.Timer(self.timeout_seconds, self._timeout_callback, args=[operation_name])
self.timer.start()
print(f"[信息] 开始{operation_name},超时时间: {self.timeout_seconds}")
def stop(self):
with self._lock:
if self.timer:
self.timer.cancel()
self.timer = None
def _timeout_callback(self, operation_name):
print(f"[警告] {operation_name}操作超时,正在尝试恢复...")
# 这里可以添加恢复逻辑
# ---------------------- 核心功能模块 ----------------------
class VideoProcessor:
def __init__(self):
# os.environ["PATH"] = FFMPEG_BIN + os.pathsep + os.environ["PATH"] # COMMENTED OUT/MODIFIED
if FFMPEG_BIN: # Only set if FFMPEG_BIN is not empty
ffmpeg_exe_path = os.path.join(FFMPEG_BIN, 'ffmpeg.exe')
print(f"--- Debug VideoProcessor --- Setting IMAGEIO_FFMPEG_EXE to: {repr(ffmpeg_exe_path)}") # DEBUG LINE ADDED
os.environ['IMAGEIO_FFMPEG_EXE'] = ffmpeg_exe_path
else:
print("--- Debug VideoProcessor --- FFMPEG_BIN is empty, not setting IMAGEIO_FFMPEG_EXE.") # DEBUG LINE ADDED
self.frame_cache = {}
self.feature_cache = {}
self._lock = threading.Lock()
self.timeout_handler = TimeoutHandler()
@staticmethod
def check_ffmpeg():
"""验证FFmpeg可用性"""
try:
subprocess.run(["ffmpeg", "-version"], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print("[系统] FFmpeg验证成功")
return True
except Exception as e:
print(f"[错误] FFmpeg验证失败: {str(e)}")
return False
@lru_cache(maxsize=CACHE_SIZE)
def calculate_color_histogram(self, frame_key):
"""计算颜色直方图特征(带缓存)"""
frame = self.frame_cache.get(frame_key)
if frame is None:
return None
hist = cv2.calcHist([frame], [0, 1, 2], None, [8, 8, 8], [0, 256, 0, 256, 0, 256])
cv2.normalize(hist, hist)
return hist.flatten()
@lru_cache(maxsize=CACHE_SIZE)
def calculate_hog_features(self, frame_key):
"""计算HOG特征带缓存"""
frame = self.frame_cache.get(frame_key)
if frame is None:
return None
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
features = hog(gray, orientations=8, pixels_per_cell=(16, 16),
cells_per_block=(1, 1), visualize=False)
return features
@staticmethod
def is_ppt_transition(frame1, frame2):
"""检测PPT页面切换"""
# 转换为灰度图
gray1 = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY)
gray2 = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY)
# 计算边缘
edges1 = cv2.Canny(gray1, 100, 200)
edges2 = cv2.Canny(gray2, 100, 200)
# 计算边缘差异
diff = cv2.absdiff(edges1, edges2)
return np.mean(diff) > 50 # 阈值可调整
@staticmethod
def is_blank_frame(frame, threshold=30):
"""检测是否为无信息帧(纯黑屏或纯白屏)"""
try:
# 转换为灰度图
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# 计算图像统计特征
mean = np.mean(gray)
std_dev = np.std(gray)
# 检查是否为纯黑或纯白
is_black = mean < 10 and std_dev < 5
is_white = mean > 245 and std_dev < 5
# 检查是否有足够的细节
has_detail = std_dev > threshold
return is_black or is_white or not has_detail
except Exception as e:
print(f"[警告] 检查无信息帧时出错: {str(e)}")
return True
def process_frame_batch(self, frames_batch, start_idx):
"""处理一批帧"""
results = []
for i, frame in enumerate(frames_batch):
idx = start_idx + i
frame_key = f"frame_{idx}"
self.frame_cache[frame_key] = frame
results.append((idx, frame))
return results
def extract_keyframes(self, video_path: str) -> tuple:
"""提取去重关键帧及其时间戳(多特征融合,并行处理)"""
cap = None
try:
self.timeout_handler.start("关键帧提取")
logging.info(f"[Debug extract_keyframes] Original video_path type: {type(video_path)}, value: {repr(video_path)}")
# --- OpenCV VideoCapture for reading frames ---
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
logging.error(f"OpenCV: Failed to open video file: {video_path}")
raise ValueError(f"无法打开视频文件: {video_path}")
# 获取视频元数据
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = total_frames / fps if fps > 0 else 0
if total_frames <= 0 or fps <= 0 or duration <= 0:
error_message = f"视频元数据读取不完整或无效: fps={fps}, total_frames={total_frames}, duration={duration}"
logging.error(error_message)
raise ValueError(error_message)
print(f"[信息] 视频元数据: 总帧数={total_frames}, 时长={duration:.2f}秒, FPS={fps:.2f}")
keyframes = []
timestamps = []
prev_frame_bgr = None
prev_frame_gray = None
frame_count = 0
progress = ProgressTracker(total_frames, "关键帧提取 (OpenCV)")
# 根据视频长度调整采样间隔
is_short_video = duration < 30
if is_short_video:
sample_interval_frames = max(int(fps * 0.5), 1) # 短视频每0.5秒采样一次
else:
sample_interval_frames = max(int(fps * 1.0), 1) # 长视频每1秒采样一次
logging.info(f"使用采样间隔: {sample_interval_frames}帧 (约{sample_interval_frames/fps:.1f}秒/帧)")
while cap.isOpened():
ret, frame_bgr = cap.read()
if not ret:
break
frame_count += 1
progress.update(1)
# 按采样间隔处理帧
if frame_count % sample_interval_frames != 0:
continue
# 检查是否为空白帧
if self.is_blank_frame(frame_bgr, simplified=True):
continue
# 转换为灰度图用于SSIM计算
frame_gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY)
# 与前一帧比较
if prev_frame_gray is not None:
# 计算SSIM
ssim_value = ssim(prev_frame_gray, frame_gray)
logging.debug(f"[Debug SSIM] Frame {frame_count}, SSIM: {ssim_value:.4f}")
if ssim_value > 0.95:
logging.info(f"[Debug SSIM] Frame {frame_count} is too similar to previous frame, skipping.")
continue
# 保存关键帧
frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
frame_pil = Image.fromarray(frame_rgb)
keyframes.append(frame_pil)
timestamps.append(frame_count / fps)
# 更新前一帧
prev_frame_bgr = frame_bgr.copy()
prev_frame_gray = frame_gray.copy()
# 如果没有提取到任何关键帧,进行强制采样
if not keyframes:
logging.warning("自动提取关键帧为0进行强制均匀采样")
cap.set(cv2.CAP_PROP_POS_FRAMES, 0) # 重置到视频开始
frame_count = 0
# 强制采样间隔
force_sample_interval = max(total_frames // 10, 1) # 至少提取10帧
while cap.isOpened():
ret, frame_bgr = cap.read()
if not ret:
break
frame_count += 1
if frame_count % force_sample_interval == 0:
frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
frame_pil = Image.fromarray(frame_rgb)
keyframes.append(frame_pil)
timestamps.append(frame_count / fps)
if not keyframes:
raise RuntimeError("关键帧提取失败:未能提取到任何关键帧")
print(f"[图像] 关键帧提取完成 (OpenCV),共{len(keyframes)}")
self.timeout_handler.stop()
progress.complete(f"提取了{len(keyframes)}个关键帧 (OpenCV)")
return keyframes, duration
except Exception as e:
logging.error(f"[错误] 关键帧提取失败 (OpenCV流程): {str(e)}")
logging.error(traceback.format_exc())
self.timeout_handler.stop()
return [], 0.0
finally:
if cap and cap.isOpened():
cap.release()
logging.info("OpenCV VideoCapture released in finally block.")
def _is_frame_different(self, frame1_bgr_np, frame2_bgr_np, simplified=False, threshold=0.8):
"""简化版本的帧差异检测. Expects BGR NumPy arrays from OpenCV."""
if simplified:
try:
gray1 = cv2.cvtColor(frame1_bgr_np, cv2.COLOR_BGR2GRAY)
gray2 = cv2.cvtColor(frame2_bgr_np, cv2.COLOR_BGR2GRAY)
diff = cv2.absdiff(gray1, gray2)
mean_diff = np.mean(diff)
# 降低阈值,使检测更敏感
required_mean_diff = threshold * 3.0 # 从7.5降低到3.0
logging.debug(f"[Debug Diff] mean_diff: {mean_diff:.2f}, required_mean_diff (threshold={threshold:.2f}*3.0): {required_mean_diff:.2f}")
return mean_diff > required_mean_diff
except Exception as e_diff_internal:
logging.error(f"Error in _is_frame_different simplified: {e_diff_internal}")
return True # On error, assume different to avoid losing a frame
else:
# 完整的特征比较逻辑 (当前未被调用,因为 simplified=True)
logging.warning("_is_frame_different called with simplified=False, but non-simplified path is not fully implemented.")
return True
def is_blank_frame(self, frame_bgr_np, simplified=False, threshold=20):
"""检测是否为无信息帧(支持简化版本). Expects BGR NumPy array from OpenCV."""
try:
gray = cv2.cvtColor(frame_bgr_np, cv2.COLOR_BGR2GRAY)
mean = np.mean(gray)
std_dev = np.std(gray)
if simplified: # Simplified version for main loop
# 调整阈值,使其更宽松
is_black = mean < 40 and std_dev < 20 # 从35/15调整到40/20
is_white = mean > 215 and std_dev < 20 # 从220/15调整到215/20
# 降低细节检测阈值
has_enough_detail = std_dev >= threshold * 0.8 # 降低阈值要求
is_actually_blank = (is_black or is_white) or not has_enough_detail
logging.debug(f"[Debug BlankS] mean: {mean:.2f}, std_dev: {std_dev:.2f}, threshold: {threshold}, is_black: {is_black}, is_white: {is_white}, has_enough_detail: {has_enough_detail}, result: {is_actually_blank}")
return is_actually_blank
else:
# Original more complex logic (currently not used by main path)
is_black_orig = mean < 10 and std_dev < 5
is_white_orig = mean > 245 and std_dev < 5
has_detail_orig = std_dev > threshold
is_actually_blank_orig = (is_black_orig or is_white_orig) or not has_detail_orig
logging.debug(f"[Debug BlankNS] mean: {mean:.2f}, std_dev: {std_dev:.2f}, threshold: {threshold}, is_black: {is_black_orig}, is_white: {is_white_orig}, has_detail: {has_detail_orig}, result: {is_actually_blank_orig}")
return is_actually_blank_orig
except Exception as e_blank_internal:
print(f"[警告] 检查无信息帧时出错 (OpenCV): {str(e_blank_internal)}")
logging.error(f"Error in is_blank_frame: {e_blank_internal}")
return False # On error, assume NOT blank to avoid wrongly discarding a frame
@staticmethod
def transcribe_audio(video_path: str, model_name: str = WHISPER_MODEL) -> list:
"""语音识别与时间戳获取(支持中英文混合,通过语言自动检测,并转换为简体中文)"""
try:
# 创建进度跟踪器
progress = ProgressTracker(100, "语音识别")
progress.update(10, "加载模型")
# 使用更大的模型提高准确率
model = whisper.load_model(model_name, device="cpu", download_root=MODEL_DIR)
progress.update(20, "开始转写")
logging.info(f"[Whisper] Starting transcription for: {video_path} with model: {model_name}. Language auto-detection ON.")
result = model.transcribe(
video_path,
fp16=False,
task="transcribe",
verbose=True,
initial_prompt=None
)
detected_language = result.get("language", "unknown")
logging.info(f"[Whisper] Transcription complete. Detected language: {detected_language}")
progress.update(60, f"处理转写结果 (语言: {detected_language})")
segments = result.get("segments", [])
if detected_language == 'zh':
logging.info("[Whisper] 检测到中文,将进行繁体到简体转换。")
try:
# 尝试不带 .json 后缀初始化,让库自行查找标准配置文件
converter = opencc.OpenCC('t2s') # <--- 修改点:移除了 .json
for i, seg in enumerate(segments):
original_text = seg['text']
simplified_text = converter.convert(original_text)
if original_text != simplified_text:
logging.debug(f"[OpenCC] 片段 {i} 转换: '{original_text[:30]}...' -> '{simplified_text[:30]}...'")
seg['text'] = simplified_text
logging.info("[OpenCC] 繁体到简体转换完成。")
except Exception as e_opencc:
logging.error(f"[OpenCC] 繁体到简体转换失败: {e_opencc}。将使用原始转录文本。")
logging.info("[Whisper] 应用中文专业术语替换。")
for i, seg in enumerate(segments):
text = seg["text"]
for cn, en in PROFESSIONAL_TERMS.items():
text = text.replace(cn, f"{cn}({en})")
seg["text"] = text
else:
logging.info(f"[Whisper] Detected language is {detected_language}. Skipping Chinese char conversion and professional terms replacement.")
if segments:
progress.update(30, f"已处理 {len(segments)} 个片段的文本转换与术语替换 (如果适用)")
else:
progress.update(30, "无语音片段进行文本转换或术语替换处理")
progress.complete(f"识别了{len(segments)}个语音片段")
return segments
except Exception as e:
print(f"[错误] 语音识别失败: {str(e)}")
return []
# ---------------------- 业务逻辑模块 ----------------------
class ContentAligner:
@staticmethod
def generate_page_intervals(timestamps: list, duration: float) -> list:
"""生成页面时间段"""
intervals = []
for i in range(len(timestamps)):
start = timestamps[i]
end = timestamps[i + 1] if i < len(timestamps) - 1 else duration
intervals.append((start, end))
return intervals
@staticmethod
@lru_cache(maxsize=CACHE_SIZE)
def calculate_text_similarity(text1: str, text2: str) -> float:
"""计算文本相似度(带缓存)"""
# 使用简单的词重叠度计算
words1 = set(re.findall(r'\w+', text1.lower()))
words2 = set(re.findall(r'\w+', text2.lower()))
if not words1 or not words2:
return 0.0
intersection = words1.intersection(words2)
union = words1.union(words2)
return len(intersection) / len(union)
@staticmethod
def _process_segment(seg, seg_start, intervals, all_segments):
"""处理单个语音片段(用于并行处理)"""
# 首先尝试时间戳匹配
for page_idx, (start, end) in enumerate(intervals):
if start <= seg_start < end:
return page_idx, seg
# 如果时间戳匹配失败,尝试文本相似度匹配
best_page = None
best_score = 0.0
for page_idx, (start, end) in enumerate(intervals):
# 获取该页面的所有文本
page_text = " ".join([s["text"] for s in all_segments if start <= s["start"] < end])
similarity = ContentAligner.calculate_text_similarity(seg["text"], page_text)
if similarity > best_score:
best_score = similarity
best_page = page_idx
if best_page is not None:
return best_page, seg
return None
@staticmethod
def _filter_repetitive_segments(segments: list, min_repeats_for_deduplication: int = 3) -> list:
"""过滤连续重复的语音片段。如果一个片段的文本连续重复N次或更多则只保留第一个实例。"""
if not segments:
return []
filtered_segments = []
i = 0
n = len(segments)
while i < n:
text_to_match = segments[i]['text']
# 计算当前文本连续重复的次数
count = 0
k = i
while k < n and segments[k]['text'] == text_to_match:
count += 1
k += 1
if count < min_repeats_for_deduplication: # 例如重复1或2次保留所有
filtered_segments.extend(segments[i : i + count])
else: # 例如重复3次或更多只保留第一个
filtered_segments.append(segments[i]) # 保留序列中的第一个片段
logging.info(f"文本去重:'{text_to_match[:50]}...' 连续出现 {count}已保留1次。原始首片段信息Start={segments[i]['start']}, End={segments[i]['end']}")
i = k # 移动到下一个不同的文本块或列表末尾
return filtered_segments
@staticmethod
def find_best_match(segments: list, intervals: list) -> dict:
"""为每个语音片段找到最佳匹配的页面(并行处理)"""
page_texts = defaultdict(list)
unmatched_segments = []
# 创建进度跟踪器
progress = ProgressTracker(len(segments), "内容对齐")
# 使用线程池进行并行处理
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = []
for seg in segments:
seg_start = seg["start"]
future = executor.submit(ContentAligner._process_segment, seg, seg_start, intervals, segments)
futures.append(future)
# 收集结果
for i, future in enumerate(concurrent.futures.as_completed(futures)):
try:
result = future.result()
if result:
page_idx, seg = result
page_texts[page_idx].append(seg)
else:
unmatched_segments.append(seg)
progress.update(1, f"处理第{i + 1}/{len(segments)}个片段")
except Exception as e:
print(f"[警告] 处理语音片段时出错: {str(e)}")
# 处理未匹配的片段
if unmatched_segments:
print(f"[警告] 发现{len(unmatched_segments)}个未匹配的语音片段")
# 将未匹配片段添加到最近的页面
for seg in unmatched_segments:
closest_page = min(range(len(intervals)),
key=lambda i: abs(seg["start"] - (intervals[i][0] + intervals[i][1]) / 2))
page_texts[closest_page].append(seg)
progress.complete(f"对齐了{len(segments)}个语音片段")
return page_texts
@staticmethod
def align_content(video_path: str, timestamps: list) -> list:
"""语音-画面对齐主逻辑(改进版,并行处理)"""
timeout_handler_align = None # Initialize for finally block
cap_align = None
try:
# 创建超时处理器
timeout_handler_align = TimeoutHandler()
timeout_handler_align.start("内容对齐")
# 获取视频时长 - Prefer OpenCV, then ffmpeg fallback (consistent with extract_keyframes)
duration = 0
cap_align = cv2.VideoCapture(video_path)
if cap_align.isOpened():
fps_align = cap_align.get(cv2.CAP_PROP_FPS)
total_frames_align = int(cap_align.get(cv2.CAP_PROP_FRAME_COUNT))
if fps_align > 0 and total_frames_align > 0:
duration = total_frames_align / fps_align
logging.info(f"[AlignContent] OpenCV获取视频时长: {duration:.2f}")
if duration <= 0 : # Fallback to ffmpeg CLI for duration
logging.warning("[AlignContent] OpenCV未能获取有效时长尝试FFmpeg CLI。")
try:
ffmpeg_exe_path = os.path.join(FFMPEG_BIN, 'ffmpeg.exe')
if not (FFMPEG_BIN and os.path.exists(ffmpeg_exe_path)):
ffmpeg_exe_path = 'ffmpeg'
process = subprocess.run([ffmpeg_exe_path, '-i', video_path], capture_output=True, text=True, timeout=30)
output_to_parse = process.stderr
duration_match = re.search(r'Duration: (\\d{2}):(\\d{2}):(\\d{2}\\.\\d+)', output_to_parse)
if duration_match:
h, m, s = map(float, duration_match.groups())
duration = h * 3600 + m * 60 + s
logging.info(f"[AlignContent] FFmpeg CLI成功获取duration: {duration:.2f}")
else: # Try stdout
duration_match_stdout = re.search(r'Duration: (\\d{2}):(\\d{2}):(\\d{2}\\.\\d+)', process.stdout)
if duration_match_stdout:
h, m, s = map(float, duration_match_stdout.groups())
duration = h * 3600 + m * 60 + s
logging.info(f"[AlignContent] FFmpeg CLI (stdout)成功获取duration: {duration:.2f}")
else:
logging.error("[AlignContent] FFmpeg CLI无法获取duration。将使用timestamps。")
except Exception as ff_exc_align:
logging.error(f"[AlignContent] FFmpeg CLI获取duration时出错: {ff_exc_align}。将使用timestamps。")
if duration <= 0: # If all attempts to get duration failed
if timestamps and len(timestamps) > 0 :
duration = timestamps[-1] + FRAME_INTERVAL # Estimate from last keyframe timestamp
logging.warning(f"[AlignContent] 无法获取精确视频时长,根据最后关键帧估算为: {duration:.2f}")
else: # Cannot determine duration at all
logging.error("[AlignContent] 无法确定视频时长,内容对齐可能不准确。")
# Potentially raise error or return empty if duration is critical
# For now, proceed with duration=0, which might lead to issues in generate_page_intervals
pass # Let it proceed with duration = 0, subsequent logic must handle
# 语音识别
segments = VideoProcessor.transcribe_audio(video_path)
if not segments:
logging.warning("未识别到语音内容,将生成空文本摘要")
segments = []
else:
original_segment_count = len(segments)
segments = ContentAligner._filter_repetitive_segments(segments) # 调用文本去重
if len(segments) < original_segment_count:
logging.info(f"语音片段去重处理完成:从 {original_segment_count} 个片段减少到 {len(segments)} 个片段。")
else:
logging.info("语音片段无需去重处理。")
# 生成页面时间间隔
intervals = ContentAligner.generate_page_intervals(timestamps, duration)
# 使用改进的匹配算法(并行处理)
page_texts = ContentAligner.find_best_match(segments, intervals)
# 生成最终的对齐数据
aligned_data = []
for idx in range(len(intervals)):
text = " ".join([seg["text"] for seg in page_texts.get(idx, [])])
aligned_data.append({
"page": idx,
"start_time": intervals[idx][0],
"end_time": intervals[idx][1],
"text": text if text else "未识别到相关语音内容"
})
# 停止超时处理
timeout_handler_align.stop()
return aligned_data
except Exception as e:
logging.error(f"内容对齐失败: {str(e)}")
logging.error(traceback.format_exc())
return []
finally:
if timeout_handler_align:
timeout_handler_align.stop()
if cap_align and cap_align.isOpened():
cap_align.release()
logging.info("[AlignContent] OpenCV VideoCapture released.")
# ---------------------- 摘要生成模块 ----------------------
class SummaryGenerator:
@staticmethod
def optimize_text(text: str) -> str:
"""优化文本内容"""
# 替换专业术语
for term, abbr in PROFESSIONAL_TERMS.items():
text = text.replace(term, f'<span class="professional-term">{term}</span> ({abbr})')
# 优化过渡词
for word in TRANSITION_WORDS:
text = text.replace(word, f'<span class="transition-word">{word}</span>')
return text
@staticmethod
def generate_html(aligned_data: list, keyframes: list, output_dir: str):
"""生成HTML格式的报告"""
# 创建临时目录用于存储图片
temp_img_dir = os.path.join(output_dir, "temp_images")
os.makedirs(temp_img_dir, exist_ok=True)
# 创建进度跟踪器
progress = ProgressTracker(len(aligned_data) + 1, "HTML报告生成")
# 创建超时处理器
timeout_handler = TimeoutHandler()
timeout_handler.start("HTML报告生成")
try:
# 检查输出目录权限
try:
# 尝试在输出目录创建测试文件以验证权限
test_file = os.path.join(output_dir, "test_write_permission.tmp")
with open(test_file, 'w') as f:
f.write("test")
os.remove(test_file)
logging.info(f"输出目录权限检查通过: {output_dir}")
except Exception as e:
logging.error(f"输出目录权限检查失败: {str(e)},尝试使用当前目录")
# 如果指定的输出目录不可写,则使用当前目录
output_dir = os.path.abspath(".")
temp_img_dir = os.path.join(output_dir, "temp_images")
os.makedirs(temp_img_dir, exist_ok=True)
logging.info(f"已切换到当前目录作为输出: {output_dir}")
# 性能优化:减小图片大小,加快处理
logging.info("优化图片尺寸以提高性能")
optimized_keyframes = []
for frame in keyframes:
# 限制图片最大尺寸为720p
if frame.width > 1280 or frame.height > 720:
aspect_ratio = frame.width / frame.height
if aspect_ratio > 16/9: # 宽屏
new_width = 1280
new_height = int(new_width / aspect_ratio)
else:
new_height = 720
new_width = int(new_height * aspect_ratio)
frame = frame.resize((new_width, new_height), Image.LANCZOS)
optimized_keyframes.append(frame)
keyframes = optimized_keyframes
logging.info("图片尺寸优化完成")
# 处理所有帧
pages_data = []
for idx, frame in enumerate(keyframes):
try:
page_data = SummaryGenerator._process_frame(idx, frame, aligned_data, temp_img_dir)
if page_data:
pages_data.append(page_data)
progress.update(1, f"处理第 {idx + 1}")
except Exception as e:
logging.error(f"处理帧 {idx} 时出错: {str(e)}")
logging.error(traceback.format_exc())
continue
# 检查是否有成功处理的页面
if not pages_data:
logging.error("没有成功处理任何页面无法生成HTML报告")
raise RuntimeError("没有成功处理任何页面无法生成HTML报告")
# 生成HTML模板
template = Environment().from_string("""
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>PPT视频结构化摘要</title>
<style>
:root {
--primary-color: #2c3e50;
--secondary-color: #3498db;
--background-color: #f8f9fa;
--text-color: #333;
}
body {
font-family: 'Arial', sans-serif;
line-height: 1.6;
color: var(--text-color);
max-width: 1200px;
margin: 0 auto;
padding: 2rem;
background-color: var(--background-color);
}
h1 {
color: var(--primary-color);
text-align: center;
margin-bottom: 2rem;
}
.page {
background: white;
border-radius: 8px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
margin-bottom: 2rem;
padding: 1.5rem;
opacity: 0;
transition: opacity 0.5s ease-in-out;
}
.page-header {
display: flex;
justify-content: space-between;
align-items: center;
margin-bottom: 1rem;
padding-bottom: 0.5rem;
border-bottom: 1px solid #eee;
}
.page-number {
font-weight: bold;
color: var(--secondary-color);
}
.timestamp {
color: #666;
font-size: 0.9rem;
}
.page-content {
display: flex;
gap: 2rem;
}
.image-container {
flex: 1;
min-width: 300px;
}
.image-container img {
width: 100%;
height: auto;
border-radius: 4px;
box-shadow: 0 1px 3px rgba(0,0,0,0.1);
}
.text-content {
flex: 2;
background: var(--background-color);
padding: 1.5rem;
border-radius: 5px;
font-size: 1.1rem;
line-height: 1.8;
}
.professional-term {
color: var(--secondary-color);
font-weight: bold;
}
.transition-word {
color: #e74c3c;
font-style: italic;
}
@media (max-width: 768px) {
.page-content {
flex-direction: column;
}
.image-container {
min-width: auto;
}
}
</style>
</head>
<body>
<h1>PPT视频结构化摘要</h1>
{% for page in pages %}
<div class="page">
<div class="page-header">
<div class="page-number">第 {{ page.num }} 页</div>
<div class="timestamp">时间区间:{{ page.time }}</div>
</div>
<div class="page-content">
<div class="image-container">
<img src="{{ page.image }}" alt="页面截图">
</div>
<div class="text-content">{{ page.text }}</div>
</div>
</div>
{% endfor %}
<script>
document.addEventListener('DOMContentLoaded', function() {
const pages = document.querySelectorAll('.page');
pages.forEach((page, index) => {
setTimeout(() => {
page.style.opacity = '1';
}, index * 100);
});
});
</script>
</body>
</html>
""")
# 保存HTML文件
output_path = os.path.join(output_dir, "summary.html")
try:
with open(output_path, "w", encoding="utf-8") as f:
f.write(template.render(pages=pages_data))
logging.info(f"HTML报告已生成: {output_path}")
# 检查文件是否已成功写入
if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
logging.info(f"HTML报告验证成功: {output_path},大小: {os.path.getsize(output_path)} 字节")
else:
logging.error(f"HTML报告生成失败: 文件不存在或为空: {output_path}")
raise IOError(f"HTML报告生成失败: 文件不存在或为空: {output_path}")
except Exception as e:
logging.error(f"HTML报告保存失败: {str(e)}")
# 尝试使用备用路径
backup_path = os.path.join(os.path.abspath("."), f"summary_{int(time.time())}.html")
logging.info(f"尝试使用备用路径保存HTML: {backup_path}")
with open(backup_path, "w", encoding="utf-8") as f:
f.write(template.render(pages=pages_data))
logging.info(f"HTML报告已使用备用路径生成: {backup_path}")
output_path = backup_path # 更新输出路径
# 停止超时处理
timeout_handler.stop()
progress.complete(f"HTML报告生成完成: {output_path}")
# 打印明确的文件位置信息以便用户查找
print(f"\n[重要] HTML报告已生成在: {os.path.abspath(output_path)}\n")
except Exception as e:
logging.error(f"HTML报告生成过程中发生错误: {str(e)}")
try:
logging.error(traceback.format_exc())
except Exception:
logging.error("无法获取详细错误信息traceback模块不可用")
# 停止超时处理
timeout_handler.stop()
raise
finally:
# 清理临时文件
try:
if os.path.exists(temp_img_dir):
for f in os.listdir(temp_img_dir):
try:
os.remove(os.path.join(temp_img_dir, f))
except Exception as e:
logging.error(f"删除临时图片文件失败: {str(e)}")
try:
os.rmdir(temp_img_dir)
logging.info("已删除临时图片目录")
except Exception as e:
logging.error(f"删除临时图片目录失败: {str(e)}")
except Exception as e:
logging.error(f"清理临时文件时出错: {str(e)}")
return output_path # 返回生成的HTML文件路径
@staticmethod
def _process_frame(idx, frame, aligned_data, temp_img_dir):
"""处理单个帧"""
try:
img_path = os.path.join(temp_img_dir, f"page_{idx}.jpg")
frame.save(img_path, quality=85)
with open(img_path, "rb") as f:
img_data = base64.b64encode(f.read()).decode("utf-8")
return {
"num": idx + 1,
"time": f"{aligned_data[idx]['start_time']:.1f}s - {aligned_data[idx]['end_time']:.1f}s",
"image": f"data:image/jpeg;base64,{img_data}",
"text": SummaryGenerator.optimize_text(aligned_data[idx]["text"])
}
except Exception as e:
logging.error(f"处理帧 {idx} 时出错: {str(e)}")
return None
@staticmethod
def generate_pdf(aligned_data: list, keyframes: list, output_dir: str):
"""生成PDF格式的报告"""
# 首先生成HTML文件
html_path = os.path.join(output_dir, "summary.html")
if not os.path.exists(html_path):
SummaryGenerator.generate_html(aligned_data, keyframes, output_dir)
# 创建进度跟踪器
progress = ProgressTracker(1, "PDF报告生成")
# 创建超时处理器
timeout_handler = TimeoutHandler()
timeout_handler.start("PDF报告生成")
try:
logging.info("开始将HTML转换为PDF...")
# 设置PDF配置选项
options = {
'page-size': 'A4',
'margin-top': '0.75in',
'margin-right': '0.75in',
'margin-bottom': '0.75in',
'margin-left': '0.75in',
'encoding': 'UTF-8',
'no-outline': None,
'quiet': ''
}
# 生成PDF文件路径
pdf_path = os.path.join(output_dir, "summary.pdf")
# 使用pdfkit生成PDF
try:
pdfkit.from_file(html_path, pdf_path, options=options)
logging.info(f"PDF报告已生成: {pdf_path}")
# 停止超时处理
timeout_handler.stop()
progress.complete("PDF报告生成完成")
return True
except Exception as e:
logging.error(f"PDF生成失败: {str(e)}")
return False
except Exception as e:
logging.error(f"PDF报告生成过程出错: {str(e)}")
timeout_handler.stop()
return False
@classmethod
def generate_all(cls, aligned_data: list, keyframes: list, output_dir: str):
"""生成所有格式报告"""
try:
# 首先生成HTML报告
html_path = cls.generate_html(aligned_data, keyframes, output_dir)
# 输出明确的报告位置提示
print(f"\n[完成] 报告生成成功!\n")
print(f"HTML报告地址: {os.path.abspath(html_path)}")
# 尝试生成PDF报告
pdf_success = False
try:
# 检查pdfkit模块是否可用
if 'pdfkit' in sys.modules:
pdf_success = cls.generate_pdf(aligned_data, keyframes, output_dir)
else:
logging.info("pdfkit模块不可用跳过PDF生成")
except Exception as e:
logging.error(f"PDF报告生成失败: {str(e)}")
if not pdf_success:
logging.warning("PDF生成功能不可用或生成失败仅生成HTML报告")
return True
except Exception as e:
logging.error(f"报告生成出错: {str(e)}")
logging.error(traceback.format_exc())
# 创建一个极简的报告,以确保用户至少能看到一些结果
try:
fallback_path = os.path.join(os.path.abspath("."), "emergency_report.html")
with open(fallback_path, "w", encoding="utf-8") as f:
f.write(f"""
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>应急报告</title>
</head>
<body>
<h1>视频处理完成,但报告生成失败</h1>
<p>处理过程中发生了以下错误:</p>
<pre>{str(e)}</pre>
<p>请查看日志文件以获取更多信息。</p>
</body>
</html>
""")
print(f"\n[警告] 正常报告生成失败,已创建应急报告: {fallback_path}\n")
except Exception:
logging.error("创建应急报告也失败了")
return False
# ---------------------- 主流程控制 ----------------------
def main_process(video_path, output_dir=None, progress_callback=None):
try:
logging.info(f"开始处理视频文件: {video_path}")
# 设置输出目录
if output_dir is None:
output_dir = OUTPUT_DIR
# 检查输出目录是否存在,如果不存在则创建
try:
os.makedirs(output_dir, exist_ok=True)
logging.info(f"使用输出目录: {output_dir}")
# 检查输出目录权限
test_file = os.path.join(output_dir, "test_permission.tmp")
with open(test_file, "w") as f:
f.write("test")
os.remove(test_file)
except Exception as e:
logging.error(f"输出目录异常: {str(e)},使用当前目录作为替代")
output_dir = os.path.abspath(".")
os.makedirs(output_dir, exist_ok=True)
logging.info(f"已切换到当前目录: {output_dir}")
# 进度回调函数
def update_progress(progress, message=None):
if progress_callback:
try:
progress_callback(progress, message)
except Exception as e:
logging.error(f"进度回调函数执行失败: {str(e)}")
logging.info(f"处理进度: {progress}% - {message if message else ''}")
# 初始化进度
update_progress(0, "开始处理视频")
# 检查视频文件是否存在
if not os.path.exists(video_path):
error_msg = f"视频文件不存在: {video_path}"
logging.error(error_msg)
update_progress(0, f"错误: {error_msg}")
raise FileNotFoundError(error_msg)
# 检查文件大小
file_size = os.path.getsize(video_path) / (1024 * 1024) # 转换为MB
logging.info(f"视频文件大小: {file_size:.2f}MB")
# 检查文件是否为空
if file_size == 0:
error_msg = "视频文件为空"
logging.error(error_msg)
update_progress(0, f"错误: {error_msg}")
raise ValueError(error_msg)
# 检查文件是否可读
try:
with open(video_path, 'rb') as f:
f.read(1024) # 尝试读取一小块数据
except Exception as e:
error_msg = f"视频文件无法读取: {str(e)}"
logging.error(error_msg)
update_progress(0, f"错误: {error_msg}")
raise IOError(error_msg)
# 检查依赖项
update_progress(5, "检查系统依赖")
if not check_dependencies():
error_msg = "依赖项检查失败"
logging.error(error_msg)
update_progress(5, f"错误: {error_msg}")
raise RuntimeError(f"{error_msg},请检查日志获取详细信息")
update_progress(10, "依赖项检查通过")
# 初始化视频处理器
processor = VideoProcessor()
# 提取关键帧
logging.info("开始提取关键帧...")
update_progress(15, "开始提取关键帧")
try:
keyframes, duration = processor.extract_keyframes(video_path)
if not keyframes:
error_msg = "关键帧提取失败:未能提取到任何关键帧"
logging.error(error_msg)
update_progress(15, f"错误: 未能提取到关键帧")
raise RuntimeError(error_msg)
logging.info(f"成功提取 {len(keyframes)} 个关键帧,视频时长:{duration:.2f}")
update_progress(40, f"已提取 {len(keyframes)} 个关键帧")
except Exception as e:
error_msg = f"关键帧提取过程出错: {str(e)}"
logging.error(error_msg)
try:
logging.error(traceback.format_exc())
except Exception:
logging.error("无法获取详细错误信息traceback模块不可用")
update_progress(15, f"错误: 关键帧提取失败 - {str(e)}")
raise RuntimeError(error_msg)
# 转录音频
logging.info("开始转录音频...")
update_progress(45, "开始转录音频")
try:
segments = VideoProcessor.transcribe_audio(video_path)
if not segments:
logging.warning("音频转录失败:未能识别到任何语音内容")
update_progress(45, "警告: 未识别到语音内容,将生成空文本摘要")
segments = []
else:
logging.info(f"成功转录 {len(segments)} 个音频片段")
update_progress(65, f"已转录 {len(segments)} 个音频片段")
for i, seg in enumerate(segments[:3], 1): # 只记录前三个片段作为示例
logging.debug(f"音频片段 {i}: {seg['text'][:50]}...")
except Exception as e:
error_msg = f"音频转录过程出错: {str(e)}"
logging.error(error_msg)
try:
logging.error(traceback.format_exc())
except Exception:
logging.error("无法获取详细错误信息traceback模块不可用")
update_progress(45, f"错误: 音频转录失败 - {str(e)}")
raise RuntimeError(error_msg)
# 计算时间戳
timestamps = [0] # 添加起始时间戳
for frame_idx, frame in enumerate(keyframes[1:], 1):
timestamps.append(frame_idx * duration / len(keyframes))
# 对齐内容
logging.info("开始对齐内容...")
update_progress(70, "开始对齐内容")
try:
aligned_data = ContentAligner.align_content(video_path, timestamps)
if not aligned_data:
error_msg = "内容对齐失败:未能生成对齐数据"
logging.error(error_msg)
update_progress(70, "错误: 内容对齐失败")
# 创建一个空的对齐数据,以便能继续生成报告
aligned_data = []
for i in range(len(keyframes)):
aligned_data.append({
"page": i,
"start_time": timestamps[i],
"end_time": timestamps[i+1] if i < len(timestamps)-1 else duration,
"text": "未能识别到相关语音内容"
})
logging.info(f"已创建{len(aligned_data)}个空内容对齐数据")
update_progress(75, "使用空内容继续处理")
else:
logging.info(f"成功对齐 {len(aligned_data)} 个内容片段")
update_progress(80, f"已对齐 {len(aligned_data)} 个内容片段")
for i, data in enumerate(aligned_data[:3], 1): # 只记录前三个对齐结果作为示例
logging.debug(f"对齐片段 {i}: {data.get('start_time', 'N/A')}s - {data.get('end_time', 'N/A')}s")
except Exception as e:
error_msg = f"内容对齐过程出错: {str(e)}"
logging.error(error_msg)
try:
logging.error(traceback.format_exc())
except Exception:
logging.error("无法获取详细错误信息traceback模块不可用")
update_progress(70, f"错误: 内容对齐失败 - {str(e)}")
# 创建一个空的对齐数据,以便能继续生成报告
aligned_data = []
for i in range(len(keyframes)):
aligned_data.append({
"page": i,
"start_time": timestamps[i],
"end_time": timestamps[i+1] if i < len(timestamps)-1 else duration,
"text": "未能识别到相关语音内容"
})
logging.info(f"已创建{len(aligned_data)}个空内容对齐数据")
update_progress(75, "使用空内容继续处理")
# 生成总结
logging.info("开始生成总结...")
update_progress(85, "开始生成报告")
try:
if SummaryGenerator.generate_all(aligned_data, keyframes, output_dir):
logging.info(f"总结生成完成,输出目录: {output_dir}")
update_progress(100, "处理完成")
# 检查HTML文件是否存在
html_path = os.path.join(output_dir, "summary.html")
if os.path.exists(html_path):
logging.info(f"报告验证成功: {html_path}")
print(f"\n[成功] 报告生成完成,位置: {os.path.abspath(html_path)}\n")
else:
logging.warning(f"报告文件不存在: {html_path}")
print(f"\n[警告] 处理似乎完成但未找到报告文件,请检查日志\n")
else:
error_msg = "报告生成失败"
logging.error(error_msg)
update_progress(85, f"错误: {error_msg}")
raise RuntimeError(error_msg)
except Exception as e:
error_msg = f"总结生成过程出错: {str(e)}"
logging.error(error_msg)
try:
logging.error(traceback.format_exc())
except Exception:
logging.error("无法获取详细错误信息traceback模块不可用")
update_progress(85, f"错误: 报告生成失败 - {str(e)}")
# 尝试创建一个简单的报告
try:
simple_html = os.path.join(output_dir, "simple_report.html")
with open(simple_html, "w", encoding="utf-8") as f:
f.write(f"""
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>简单报告</title>
<style>
body {{ font-family: Arial, sans-serif; max-width: 1200px; margin: 0 auto; padding: 20px; }}
img {{ max-width: 100%; height: auto; }}
.frame {{ margin-bottom: 30px; border: 1px solid #eee; padding: 15px; }}
</style>
</head>
<body>
<h1>视频简单报告</h1>
<p>完整报告生成失败,这是一个简化版本</p>
""")
# 添加关键帧
for i, frame in enumerate(keyframes):
# 保存图片
img_path = os.path.join(output_dir, f"frame_{i}.jpg")
frame.save(img_path)
# 添加到HTML
f.write(f"""
<div class="frame">
<h2>第 {i+1} 帧</h2>
<img src="frame_{i}.jpg" alt="关键帧 {i+1}">
</div>
""")
f.write("</body></html>")
logging.info(f"简单报告已生成: {simple_html}")
print(f"\n[恢复] 创建了简单报告: {os.path.abspath(simple_html)}\n")
except Exception as inner_e:
logging.error(f"简单报告生成也失败了: {str(inner_e)}")
raise RuntimeError(error_msg)
logging.info("所有处理步骤已完成")
return True
except Exception as e:
logging.error(f"处理过程中发生错误: {str(e)}")
logging.error("详细错误信息:")
try:
logging.error(traceback.format_exc())
except Exception:
logging.error("无法获取详细错误信息traceback模块不可用")
if progress_callback:
try:
progress_callback(0, f"处理失败: {str(e)}")
except:
pass
print(f"\n[错误] 处理失败: {str(e)}\n")
return False
if __name__ == "__main__":
try:
if len(sys.argv) < 2:
print("使用方法: python 毕设.py <视频文件路径>")
sys.exit(1)
video_path = sys.argv[1]
if main_process(video_path):
print("[完成] 处理成功")
sys.exit(0)
else:
print("[错误] 处理失败,请查看日志文件了解详情")
sys.exit(1)
except KeyboardInterrupt:
print("\n[中断] 用户中断了处理")
sys.exit(130)
except Exception as e:
print(f"[错误] 程序执行过程中出现未处理的异常: {str(e)}")
try:
traceback.print_exc()
except Exception:
print("无法打印详细错误信息traceback模块不可用")
sys.exit(1)