PPT/8.0/毕设.py

1469 lines
62 KiB
Python
Raw Normal View History

2025-05-14 17:49:37 +08:00
import os
import re
import base64
import warnings
import sys # Add sys import for debugging
# --- Manually add D:\Lib\site-packages to sys.path ---
site_packages_path = r'D:\Lib\site-packages'
if site_packages_path not in sys.path:
sys.path.append(site_packages_path)
# --- End of manual addition ---
print(f"--- Debug --- Attempting to import imageio in: {__file__}") # Debug print
print(f"--- Debug --- Python executable: {sys.executable}") # Debug print
print(f"--- Debug --- sys.path AFTER manual add: {sys.path}") # Debug print, note the change in message
try: # Debug block
import imageio as test_imageio_module
print(f"--- Debug --- Found 'imageio' at: {test_imageio_module.__file__}")
print(f"--- Debug --- Version of 'imageio': {test_imageio_module.__version__}")
except ImportError as e:
print(f"--- Debug --- ImportError for imageio: {e}")
except AttributeError: # Handle cases where __file__ or __version__ might be missing
print(f"--- Debug --- Found 'imageio', but cannot get __file__ or __version__.")
# The original import line
import imageio
import whisper
import numpy as np
from PIL import Image
from skimage.metrics import structural_similarity as ssim
from collections import defaultdict
import subprocess
from jinja2 import Environment
import cv2
from scipy.signal import find_peaks
from skimage.feature import hog
from skimage.color import rgb2gray
import concurrent.futures
import threading
import queue
import time
import gc
from functools import lru_cache
import multiprocessing
import signal
import traceback
import logging
import json
import shutil
import importlib
# 导入补丁模块 - 用于解决wkhtmltopdf依赖问题
try:
import pdfkit_patch as pdfkit
logging.info("已加载pdfkit补丁模块")
except ImportError:
logging.info("未找到pdfkit补丁模块PDF生成功能可能不可用")
# 设置环境变量,使用 OpenBLAS
os.environ['OPENBLAS_NUM_THREADS'] = '1'
os.environ['MKL_NUM_THREADS'] = '1'
os.environ['NUMEXPR_NUM_THREADS'] = '1'
os.environ['OMP_NUM_THREADS'] = '1'
# FFmpeg路径配置
FFMPEG_BIN = os.path.join(os.path.dirname(os.path.abspath(__file__)), "ffmpeg", "bin")
if not os.path.exists(FFMPEG_BIN):
FFMPEG_BIN = "" # 如果目录不存在使用系统环境变量中的FFmpeg
# 配置日志
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('process.log', encoding='utf-8'),
logging.StreamHandler()
]
)
def check_dependencies():
try:
# 检查FFmpeg
try:
subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True)
logging.info("FFmpeg 检查通过")
except Exception as e:
logging.error(f"FFmpeg 检查失败: {str(e)}")
return False
# 检查OpenCV
try:
import cv2
logging.info("OpenCV 检查通过")
except Exception as e:
logging.error(f"OpenCV 检查失败: {str(e)}")
return False
# 检查Whisper
try:
import whisper
logging.info("Whisper 检查通过")
except Exception as e:
logging.error(f"Whisper 检查失败: {str(e)}")
return False
# 注意: wkhtmltopdf检查已禁用
# 使用pdfkit_patch模块解决wkhtmltopdf依赖问题
logging.info("wkhtmltopdf检查已禁用仅生成HTML报告")
logging.info("所有依赖项检查通过")
return True
except Exception as e:
logging.error(f"依赖项检查失败: {str(e)}")
return False
# ======================== 全局配置 ========================
warnings.filterwarnings("ignore", message="FP16 is not supported on CPU; using FP32 instead")
# 使用相对路径
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
MODEL_DIR = os.path.join(BASE_DIR, "models")
OUTPUT_DIR = os.path.join(BASE_DIR, "output")
# 创建必要的目录
os.makedirs(MODEL_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)
# 其他配置保持不变
SSIM_THRESHOLD = 0.85 # 关键帧去重阈值
FRAME_INTERVAL = 2 # 抽帧间隔(秒)
TRANSITION_WORDS = ["接下来", "下一页", "如图"] # 过渡词过滤列
HOG_THRESHOLD = 0.7 # HOG特征相似度阈值
COLOR_THRESHOLD = 0.8 # 颜色直方图相似度阈值
WHISPER_MODEL = "base" # Whisper模型大小
PROFESSIONAL_TERMS = {
"人工智能": "AI",
"机器学习": "ML",
"深度学习": "DL",
"神经网络": "NN",
"卷积神经网络": "CNN",
"循环神经网络": "RNN",
"自然语言处理": "NLP",
"计算机视觉": "CV",
"大数据": "Big Data",
"云计算": "Cloud Computing"
} # 专业术语词典
# 性能优化配置
MAX_WORKERS = max(1, multiprocessing.cpu_count() - 1) # 并行处理的工作线程数
BATCH_SIZE = 15 # 增加批处理大小
CACHE_SIZE = 150 # 增加缓存大小
MEMORY_LIMIT = 0.8 # 内存使用限制(占总内存的比例)
TIMEOUT_SECONDS = 200 # 减少超时时间以加速处理流程
PROGRESS_UPDATE_INTERVAL = 1 # 进度更新间隔(秒)
MAX_KEYFRAMES = 30 # 最大关键帧数量限制,超过此数量将进行抽样
MIN_KEYFRAMES = 5 # 最小关键帧数量要求,少于此数量将强制提取
# ========================================================
# 进度跟踪类
class ProgressTracker:
def __init__(self, total_steps, description="处理中"):
self.total_steps = total_steps
self.current_step = 0
self.description = description
self.start_time = time.time()
self.last_update_time = self.start_time
self._lock = threading.Lock()
def update(self, step=1, message=None):
with self._lock:
self.current_step += step
current_time = time.time()
# 控制更新频率
if current_time - self.last_update_time >= PROGRESS_UPDATE_INTERVAL:
elapsed = current_time - self.start_time
progress = (self.current_step / self.total_steps) * 100
if message:
print(
f"[进度] {self.description}: {progress:.1f}% ({self.current_step}/{self.total_steps}) - {message}")
else:
print(f"[进度] {self.description}: {progress:.1f}% ({self.current_step}/{self.total_steps})")
self.last_update_time = current_time
def complete(self, message="完成"):
with self._lock:
elapsed = time.time() - self.start_time
print(f"[完成] {self.description}: 100% - {message} (耗时: {elapsed:.1f}秒)")
# 超时处理类
class TimeoutHandler:
def __init__(self, timeout_seconds=TIMEOUT_SECONDS):
self.timeout_seconds = timeout_seconds
self.timer = None
self._lock = threading.Lock()
def start(self, operation_name):
with self._lock:
if self.timer:
self.timer.cancel()
self.timer = threading.Timer(self.timeout_seconds, self._timeout_callback, args=[operation_name])
self.timer.start()
print(f"[信息] 开始{operation_name},超时时间: {self.timeout_seconds}")
def stop(self):
with self._lock:
if self.timer:
self.timer.cancel()
self.timer = None
def _timeout_callback(self, operation_name):
print(f"[警告] {operation_name}操作超时,正在尝试恢复...")
# 这里可以添加恢复逻辑
# ---------------------- 核心功能模块 ----------------------
class VideoProcessor:
def __init__(self):
os.environ["PATH"] = FFMPEG_BIN + os.pathsep + os.environ["PATH"]
self.frame_cache = {}
self.feature_cache = {}
self._lock = threading.Lock()
self.timeout_handler = TimeoutHandler()
@staticmethod
def check_ffmpeg():
"""验证FFmpeg可用性"""
try:
subprocess.run(["ffmpeg", "-version"], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print("[系统] FFmpeg验证成功")
return True
except Exception as e:
print(f"[错误] FFmpeg验证失败: {str(e)}")
return False
@lru_cache(maxsize=CACHE_SIZE)
def calculate_color_histogram(self, frame_key):
"""计算颜色直方图特征(带缓存)"""
frame = self.frame_cache.get(frame_key)
if frame is None:
return None
hist = cv2.calcHist([frame], [0, 1, 2], None, [8, 8, 8], [0, 256, 0, 256, 0, 256])
cv2.normalize(hist, hist)
return hist.flatten()
@lru_cache(maxsize=CACHE_SIZE)
def calculate_hog_features(self, frame_key):
"""计算HOG特征带缓存"""
frame = self.frame_cache.get(frame_key)
if frame is None:
return None
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
features = hog(gray, orientations=8, pixels_per_cell=(16, 16),
cells_per_block=(1, 1), visualize=False)
return features
@staticmethod
def is_ppt_transition(frame1, frame2):
"""检测PPT页面切换"""
# 转换为灰度图
gray1 = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY)
gray2 = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY)
# 计算边缘
edges1 = cv2.Canny(gray1, 100, 200)
edges2 = cv2.Canny(gray2, 100, 200)
# 计算边缘差异
diff = cv2.absdiff(edges1, edges2)
return np.mean(diff) > 50 # 阈值可调整
@staticmethod
def is_blank_frame(frame, threshold=30):
"""检测是否为无信息帧(纯黑屏或纯白屏)"""
try:
# 转换为灰度图
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# 计算图像统计特征
mean = np.mean(gray)
std_dev = np.std(gray)
# 检查是否为纯黑或纯白
is_black = mean < 10 and std_dev < 5
is_white = mean > 245 and std_dev < 5
# 检查是否有足够的细节
has_detail = std_dev > threshold
return is_black or is_white or not has_detail
except Exception as e:
print(f"[警告] 检查无信息帧时出错: {str(e)}")
return True
def process_frame_batch(self, frames_batch, start_idx):
"""处理一批帧"""
results = []
for i, frame in enumerate(frames_batch):
idx = start_idx + i
frame_key = f"frame_{idx}"
self.frame_cache[frame_key] = frame
results.append((idx, frame))
return results
def extract_keyframes(self, video_path: str) -> tuple:
"""提取去重关键帧及其时间戳(多特征融合,并行处理)"""
try:
self.timeout_handler.start("关键帧提取")
reader = imageio.get_reader(video_path)
fps = reader.get_meta_data()["fps"]
total_frames = reader.count_frames()
duration = reader.get_meta_data().get("duration", total_frames / fps)
print(f"[信息] 视频总帧数: {total_frames}, 时长: {duration:.2f}")
keyframes = []
timestamps = []
prev_frame = None
frame_count = 0
# 创建进度跟踪器
progress = ProgressTracker(total_frames, "关键帧提取")
# 设置最后处理帧的阈值和超时
last_frames_threshold = 30 # 增加到30帧
last_frame_time = time.time()
last_frame_timeout = 10 # 降低到10秒超时
# 批处理大小动态调整
current_batch_size = BATCH_SIZE
# 使用队列存储结果
result_queue = queue.Queue()
# 最后阶段的简化处理标志
simplified_processing = False
# 短视频处理标志 - 小于30秒的视频被视为短视频
is_short_video = duration < 30
if is_short_video:
logging.info(f"检测到短视频 ({duration:.2f}秒),使用密集采样模式")
# 短视频采样间隔减少,确保能捕获足够帧
sample_interval = max(int(fps * 0.5), 1) # 每0.5秒一帧
else:
# 优化:计算抽样间隔
# 如果视频很长,增加抽样间隔
if total_frames > fps * 60 * 10: # 10分钟以上的视频
sample_interval = max(int(fps * 3), 1) # 每3秒抽取一帧
logging.info(f"视频较长,使用增大抽样间隔: {sample_interval}")
else:
sample_interval = max(int(fps * FRAME_INTERVAL), 1) # 使用默认间隔
logging.info(f"使用抽样间隔: {sample_interval}帧 (约{sample_interval/fps:.1f}秒/帧)")
# 使用线程池进行并行处理
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = []
frames_batch = []
batch_start_idx = 0
try:
# 修改为按间隔抽取帧
# 读取所有帧,短视频使用更密集采样
for idx, frame in enumerate(reader):
# 更新进度
progress.update(1)
# 只处理符合抽样间隔的帧
if not is_short_video and idx % sample_interval != 0:
continue
elif is_short_video and idx % sample_interval != 0:
# 短视频也按间隔处理,但间隔更小
continue
# 检查是否接近结束
if idx >= total_frames - last_frames_threshold:
if not simplified_processing:
print("[信息] 进入最后阶段,启用简化处理模式")
simplified_processing = True
# 清理现有资源
self.frame_cache.clear()
self.feature_cache.clear()
gc.collect()
current_time = time.time()
if current_time - last_frame_time > last_frame_timeout:
print(f"[警告] 处理最后{last_frames_threshold}帧时卡住,跳过剩余帧")
# 强制处理当前批次
if frames_batch:
future = executor.submit(self.process_frame_batch, frames_batch, batch_start_idx)
futures.append(future)
break
# 在最后阶段使用最小批处理大小
current_batch_size = 1
last_frame_time = current_time
curr_time = idx / fps
# 检查是否为无信息帧(短视频时使用宽松标准)
if not is_short_video and self.is_blank_frame(frame, simplified=True):
continue
elif is_short_video and self.is_blank_frame(frame, threshold=50): # 短视频使用更宽松的阈值
continue
frames_batch.append(frame)
# 当批次达到指定大小时提交处理
if len(frames_batch) >= current_batch_size:
future = executor.submit(self.process_frame_batch, frames_batch, batch_start_idx)
futures.append(future)
batch_start_idx += len(frames_batch)
frames_batch = []
# 及时清理完成的future
self._clean_completed_futures(futures, result_queue)
# 强制垃圾回收
if frame_count % 20 == 0: # 更频繁的垃圾回收
gc.collect()
# 处理剩余的帧
if frames_batch:
future = executor.submit(self.process_frame_batch, frames_batch, batch_start_idx)
futures.append(future)
# 等待所有future完成但设置更短的超时
try:
for future in concurrent.futures.as_completed(futures, timeout=15):
try:
batch_results = future.result(timeout=3) # 更短的超时
for idx, frame in batch_results:
result_queue.put((idx, frame))
except Exception as e:
print(f"[警告] 处理批次时出错: {str(e)}")
except concurrent.futures.TimeoutError:
print("[警告] 部分批次处理超时,继续处理已完成的结果")
except Exception as e:
print(f"[警告] 帧处理过程中出错: {str(e)}")
finally:
# 处理队列中的所有结果
while not result_queue.empty():
try:
idx, frame = result_queue.get_nowait()
curr_time = idx / fps
# 使用简化版本的特征比较(短视频降低相似度阈值)
if prev_frame is not None:
try:
similarity_threshold = 0.6 if is_short_video else 0.8
if not self._is_frame_different(prev_frame, frame, simplified=True, threshold=similarity_threshold):
continue
except Exception as e:
print(f"[警告] 特征比较失败: {str(e)}")
continue
keyframes.append(Image.fromarray(frame))
timestamps.append(curr_time)
prev_frame = frame
frame_count += 1
# 在最后阶段更频繁地清理资源
if simplified_processing and frame_count % 5 == 0:
gc.collect()
except queue.Empty:
break
reader.close()
print(f"[图像] 关键帧提取完成,共{len(keyframes)}")
# 检查是否达到最小关键帧要求
if len(keyframes) < MIN_KEYFRAMES and total_frames > 0:
logging.info(f"检测到关键帧数量不足({len(keyframes)}<{MIN_KEYFRAMES}),进行强制提取")
# 重新打开视频并直接均匀采样
try:
reader = imageio.get_reader(video_path)
# 计算均匀采样点
sample_points = [int(i * total_frames / MIN_KEYFRAMES) for i in range(MIN_KEYFRAMES)]
# 清空现有关键帧
keyframes = []
timestamps = []
for i, frame_idx in enumerate(sample_points):
try:
# 跳到指定帧
frame = reader.get_data(frame_idx)
keyframes.append(Image.fromarray(frame))
timestamps.append(frame_idx / fps)
logging.info(f"强制提取第{i+1}个关键帧: 帧索引={frame_idx}, 时间={frame_idx/fps:.2f}")
except Exception as e:
logging.error(f"强制提取关键帧失败: {str(e)}")
reader.close()
logging.info(f"强制提取完成,共{len(keyframes)}")
except Exception as e:
logging.error(f"强制提取关键帧过程出错: {str(e)}")
# 优化:限制最大关键帧数量,通过均匀采样减少
if len(keyframes) > MAX_KEYFRAMES:
logging.info(f"关键帧数量({len(keyframes)})超过限制({MAX_KEYFRAMES}),进行抽样")
# 计算采样间隔
sample_rate = len(keyframes) / MAX_KEYFRAMES
sampled_keyframes = []
sampled_timestamps = []
# 均匀采样
for i in range(MAX_KEYFRAMES):
idx = min(int(i * sample_rate), len(keyframes) - 1)
sampled_keyframes.append(keyframes[idx])
sampled_timestamps.append(timestamps[idx])
keyframes = sampled_keyframes
timestamps = sampled_timestamps
logging.info(f"抽样后关键帧数量: {len(keyframes)}")
# 清理资源
self.frame_cache.clear()
self.feature_cache.clear()
gc.collect()
# 停止超时处理
self.timeout_handler.stop()
progress.complete(f"提取了{len(keyframes)}个关键帧")
return keyframes, duration
except Exception as e:
print(f"[错误] 关键帧提取失败: {str(e)}")
self.timeout_handler.stop()
return [], 0.0
def _clean_completed_futures(self, futures, result_queue):
"""清理已完成的future并存储结果"""
done = []
for future in futures:
if future.done():
try:
batch_results = future.result(timeout=1)
for result in batch_results:
result_queue.put(result)
done.append(future)
except Exception as e:
print(f"[警告] 获取future结果时出错: {str(e)}")
# 从futures列表中移除已完成的
for future in done:
futures.remove(future)
# 强制垃圾回收
if len(done) > 0:
gc.collect()
def _is_frame_different(self, frame1, frame2, simplified=False, threshold=0.8):
"""简化版本的帧差异检测"""
if simplified:
try:
# 使用更简单的比较方法
gray1 = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY)
gray2 = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY)
# 计算平均差异
diff = cv2.absdiff(gray1, gray2)
mean_diff = np.mean(diff)
# 如果差异小于阈值,认为帧相同
return mean_diff > threshold * 10 # 可调整的阈值
except Exception:
return True
else:
# 完整的特征比较逻辑
return True # 默认认为不同,具体实现可以根据需要添加
def is_blank_frame(self, frame, simplified=False):
"""检测是否为无信息帧(支持简化版本)"""
try:
if simplified:
# 简化版本:只检查亮度和方差
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
mean = np.mean(gray)
std = np.std(gray)
return mean < 10 or mean > 245 or std < 20
else:
# 完整版本的检查逻辑
return super().is_blank_frame(frame)
except Exception as e:
print(f"[警告] 检查无信息帧时出错: {str(e)}")
return True
@staticmethod
def transcribe_audio(video_path: str, model_name: str = WHISPER_MODEL) -> list:
"""语音识别与时间戳获取(支持中英文混合)"""
try:
# 创建进度跟踪器
progress = ProgressTracker(100, "语音识别")
progress.update(10, "加载模型")
# 使用更大的模型提高准确率
model = whisper.load_model(model_name, device="cpu", download_root=MODEL_DIR)
progress.update(20, "开始转写")
# 配置转写参数
result = model.transcribe(
video_path,
fp16=False,
language="zh",
task="transcribe",
verbose=True,
initial_prompt="这是一段包含中英文的PPT讲解视频可能包含专业术语。"
)
progress.update(60, "处理转写结果")
segments = result.get("segments", [])
# 后处理:专业术语替换
for i, seg in enumerate(segments):
text = seg["text"]
for cn, en in PROFESSIONAL_TERMS.items():
text = text.replace(cn, f"{cn}({en})")
seg["text"] = text
progress.update(30 / len(segments), f"处理第{i + 1}/{len(segments)}个片段")
progress.complete(f"识别了{len(segments)}个语音片段")
return segments
except Exception as e:
print(f"[错误] 语音识别失败: {str(e)}")
return []
# ---------------------- 业务逻辑模块 ----------------------
class ContentAligner:
@staticmethod
def generate_page_intervals(timestamps: list, duration: float) -> list:
"""生成页面时间段"""
intervals = []
for i in range(len(timestamps)):
start = timestamps[i]
end = timestamps[i + 1] if i < len(timestamps) - 1 else duration
intervals.append((start, end))
return intervals
@staticmethod
@lru_cache(maxsize=CACHE_SIZE)
def calculate_text_similarity(text1: str, text2: str) -> float:
"""计算文本相似度(带缓存)"""
# 使用简单的词重叠度计算
words1 = set(re.findall(r'\w+', text1.lower()))
words2 = set(re.findall(r'\w+', text2.lower()))
if not words1 or not words2:
return 0.0
intersection = words1.intersection(words2)
union = words1.union(words2)
return len(intersection) / len(union)
@staticmethod
def _process_segment(seg, seg_start, intervals, all_segments):
"""处理单个语音片段(用于并行处理)"""
# 首先尝试时间戳匹配
for page_idx, (start, end) in enumerate(intervals):
if start <= seg_start < end:
return page_idx, seg
# 如果时间戳匹配失败,尝试文本相似度匹配
best_page = None
best_score = 0.0
for page_idx, (start, end) in enumerate(intervals):
# 获取该页面的所有文本
page_text = " ".join([s["text"] for s in all_segments if start <= s["start"] < end])
similarity = ContentAligner.calculate_text_similarity(seg["text"], page_text)
if similarity > best_score:
best_score = similarity
best_page = page_idx
if best_page is not None:
return best_page, seg
return None
@staticmethod
def find_best_match(segments: list, intervals: list) -> dict:
"""为每个语音片段找到最佳匹配的页面(并行处理)"""
page_texts = defaultdict(list)
unmatched_segments = []
# 创建进度跟踪器
progress = ProgressTracker(len(segments), "内容对齐")
# 使用线程池进行并行处理
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = []
for seg in segments:
seg_start = seg["start"]
future = executor.submit(ContentAligner._process_segment, seg, seg_start, intervals, segments)
futures.append(future)
# 收集结果
for i, future in enumerate(concurrent.futures.as_completed(futures)):
try:
result = future.result()
if result:
page_idx, seg = result
page_texts[page_idx].append(seg)
else:
unmatched_segments.append(seg)
progress.update(1, f"处理第{i + 1}/{len(segments)}个片段")
except Exception as e:
print(f"[警告] 处理语音片段时出错: {str(e)}")
# 处理未匹配的片段
if unmatched_segments:
print(f"[警告] 发现{len(unmatched_segments)}个未匹配的语音片段")
# 将未匹配片段添加到最近的页面
for seg in unmatched_segments:
closest_page = min(range(len(intervals)),
key=lambda i: abs(seg["start"] - (intervals[i][0] + intervals[i][1]) / 2))
page_texts[closest_page].append(seg)
progress.complete(f"对齐了{len(segments)}个语音片段")
return page_texts
@staticmethod
def align_content(video_path: str, timestamps: list) -> list:
"""语音-画面对齐主逻辑(改进版,并行处理)"""
try:
# 创建超时处理器
timeout_handler = TimeoutHandler()
timeout_handler.start("内容对齐")
# 获取视频时长
try:
reader = imageio.get_reader(video_path)
duration = reader.get_meta_data()["duration"]
reader.close()
except:
duration = timestamps[-1] + FRAME_INTERVAL
# 语音识别
segments = VideoProcessor.transcribe_audio(video_path)
if not segments:
logging.warning("未识别到语音内容,将生成空文本摘要")
segments = []
# 生成页面时间间隔
intervals = ContentAligner.generate_page_intervals(timestamps, duration)
# 使用改进的匹配算法(并行处理)
page_texts = ContentAligner.find_best_match(segments, intervals)
# 生成最终的对齐数据
aligned_data = []
for idx in range(len(intervals)):
text = " ".join([seg["text"] for seg in page_texts.get(idx, [])])
aligned_data.append({
"page": idx,
"start_time": intervals[idx][0],
"end_time": intervals[idx][1],
"text": text if text else "未识别到相关语音内容"
})
# 停止超时处理
timeout_handler.stop()
return aligned_data
except Exception as e:
logging.error(f"内容对齐失败: {str(e)}")
logging.error(traceback.format_exc())
return []
# ---------------------- 摘要生成模块 ----------------------
class SummaryGenerator:
@staticmethod
def optimize_text(text: str) -> str:
"""优化文本内容"""
# 替换专业术语
for term, abbr in PROFESSIONAL_TERMS.items():
text = text.replace(term, f'<span class="professional-term">{term}</span> ({abbr})')
# 优化过渡词
for word in TRANSITION_WORDS:
text = text.replace(word, f'<span class="transition-word">{word}</span>')
return text
@staticmethod
def generate_html(aligned_data: list, keyframes: list, output_dir: str):
"""生成HTML格式的报告"""
# 创建临时目录用于存储图片
temp_img_dir = os.path.join(output_dir, "temp_images")
os.makedirs(temp_img_dir, exist_ok=True)
# 创建进度跟踪器
progress = ProgressTracker(len(aligned_data) + 1, "HTML报告生成")
# 创建超时处理器
timeout_handler = TimeoutHandler()
timeout_handler.start("HTML报告生成")
try:
# 检查输出目录权限
try:
# 尝试在输出目录创建测试文件以验证权限
test_file = os.path.join(output_dir, "test_write_permission.tmp")
with open(test_file, 'w') as f:
f.write("test")
os.remove(test_file)
logging.info(f"输出目录权限检查通过: {output_dir}")
except Exception as e:
logging.error(f"输出目录权限检查失败: {str(e)},尝试使用当前目录")
# 如果指定的输出目录不可写,则使用当前目录
output_dir = os.path.abspath(".")
temp_img_dir = os.path.join(output_dir, "temp_images")
os.makedirs(temp_img_dir, exist_ok=True)
logging.info(f"已切换到当前目录作为输出: {output_dir}")
# 性能优化:减小图片大小,加快处理
logging.info("优化图片尺寸以提高性能")
optimized_keyframes = []
for frame in keyframes:
# 限制图片最大尺寸为720p
if frame.width > 1280 or frame.height > 720:
aspect_ratio = frame.width / frame.height
if aspect_ratio > 16/9: # 宽屏
new_width = 1280
new_height = int(new_width / aspect_ratio)
else:
new_height = 720
new_width = int(new_height * aspect_ratio)
frame = frame.resize((new_width, new_height), Image.LANCZOS)
optimized_keyframes.append(frame)
keyframes = optimized_keyframes
logging.info("图片尺寸优化完成")
# 处理所有帧
pages_data = []
for idx, frame in enumerate(keyframes):
try:
page_data = SummaryGenerator._process_frame(idx, frame, aligned_data, temp_img_dir)
if page_data:
pages_data.append(page_data)
progress.update(1, f"处理第 {idx + 1}")
except Exception as e:
logging.error(f"处理帧 {idx} 时出错: {str(e)}")
logging.error(traceback.format_exc())
continue
# 检查是否有成功处理的页面
if not pages_data:
logging.error("没有成功处理任何页面无法生成HTML报告")
raise RuntimeError("没有成功处理任何页面无法生成HTML报告")
# 生成HTML模板
template = Environment().from_string("""
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>PPT视频结构化摘要</title>
<style>
:root {
--primary-color: #2c3e50;
--secondary-color: #3498db;
--background-color: #f8f9fa;
--text-color: #333;
}
body {
font-family: 'Arial', sans-serif;
line-height: 1.6;
color: var(--text-color);
max-width: 1200px;
margin: 0 auto;
padding: 2rem;
background-color: var(--background-color);
}
h1 {
color: var(--primary-color);
text-align: center;
margin-bottom: 2rem;
}
.page {
background: white;
border-radius: 8px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
margin-bottom: 2rem;
padding: 1.5rem;
opacity: 0;
transition: opacity 0.5s ease-in-out;
}
.page-header {
display: flex;
justify-content: space-between;
align-items: center;
margin-bottom: 1rem;
padding-bottom: 0.5rem;
border-bottom: 1px solid #eee;
}
.page-number {
font-weight: bold;
color: var(--secondary-color);
}
.timestamp {
color: #666;
font-size: 0.9rem;
}
.page-content {
display: flex;
gap: 2rem;
}
.image-container {
flex: 1;
min-width: 300px;
}
.image-container img {
width: 100%;
height: auto;
border-radius: 4px;
box-shadow: 0 1px 3px rgba(0,0,0,0.1);
}
.text-content {
flex: 2;
background: var(--background-color);
padding: 1.5rem;
border-radius: 5px;
font-size: 1.1rem;
line-height: 1.8;
}
.professional-term {
color: var(--secondary-color);
font-weight: bold;
}
.transition-word {
color: #e74c3c;
font-style: italic;
}
@media (max-width: 768px) {
.page-content {
flex-direction: column;
}
.image-container {
min-width: auto;
}
}
</style>
</head>
<body>
<h1>PPT视频结构化摘要</h1>
{% for page in pages %}
<div class="page">
<div class="page-header">
<div class="page-number"> {{ page.num }} </div>
<div class="timestamp">时间区间{{ page.time }}</div>
</div>
<div class="page-content">
<div class="image-container">
<img src="{{ page.image }}" alt="页面截图">
</div>
<div class="text-content">{{ page.text }}</div>
</div>
</div>
{% endfor %}
<script>
document.addEventListener('DOMContentLoaded', function() {
const pages = document.querySelectorAll('.page');
pages.forEach((page, index) => {
setTimeout(() => {
page.style.opacity = '1';
}, index * 100);
});
});
</script>
</body>
</html>
""")
# 保存HTML文件
output_path = os.path.join(output_dir, "summary.html")
try:
with open(output_path, "w", encoding="utf-8") as f:
f.write(template.render(pages=pages_data))
logging.info(f"HTML报告已生成: {output_path}")
# 检查文件是否已成功写入
if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
logging.info(f"HTML报告验证成功: {output_path},大小: {os.path.getsize(output_path)} 字节")
else:
logging.error(f"HTML报告生成失败: 文件不存在或为空: {output_path}")
raise IOError(f"HTML报告生成失败: 文件不存在或为空: {output_path}")
except Exception as e:
logging.error(f"HTML报告保存失败: {str(e)}")
# 尝试使用备用路径
backup_path = os.path.join(os.path.abspath("."), f"summary_{int(time.time())}.html")
logging.info(f"尝试使用备用路径保存HTML: {backup_path}")
with open(backup_path, "w", encoding="utf-8") as f:
f.write(template.render(pages=pages_data))
logging.info(f"HTML报告已使用备用路径生成: {backup_path}")
output_path = backup_path # 更新输出路径
# 停止超时处理
timeout_handler.stop()
progress.complete(f"HTML报告生成完成: {output_path}")
# 打印明确的文件位置信息以便用户查找
print(f"\n[重要] HTML报告已生成在: {os.path.abspath(output_path)}\n")
except Exception as e:
logging.error(f"HTML报告生成过程中发生错误: {str(e)}")
try:
logging.error(traceback.format_exc())
except Exception:
logging.error("无法获取详细错误信息traceback模块不可用")
# 停止超时处理
timeout_handler.stop()
raise
finally:
# 清理临时文件
try:
if os.path.exists(temp_img_dir):
for f in os.listdir(temp_img_dir):
try:
os.remove(os.path.join(temp_img_dir, f))
except Exception as e:
logging.error(f"删除临时图片文件失败: {str(e)}")
try:
os.rmdir(temp_img_dir)
logging.info("已删除临时图片目录")
except Exception as e:
logging.error(f"删除临时图片目录失败: {str(e)}")
except Exception as e:
logging.error(f"清理临时文件时出错: {str(e)}")
return output_path # 返回生成的HTML文件路径
@staticmethod
def _process_frame(idx, frame, aligned_data, temp_img_dir):
"""处理单个帧"""
try:
img_path = os.path.join(temp_img_dir, f"page_{idx}.jpg")
frame.save(img_path)
with open(img_path, "rb") as f:
img_data = base64.b64encode(f.read()).decode("utf-8")
return {
"num": idx + 1,
"time": f"{aligned_data[idx]['start_time']:.1f}s - {aligned_data[idx]['end_time']:.1f}s",
"image": f"data:image/jpeg;base64,{img_data}",
"text": SummaryGenerator.optimize_text(aligned_data[idx]["text"])
}
except Exception as e:
logging.error(f"处理帧 {idx} 时出错: {str(e)}")
return None
@staticmethod
def generate_pdf(aligned_data: list, keyframes: list, output_dir: str):
"""生成PDF格式的报告"""
# 首先生成HTML文件
html_path = os.path.join(output_dir, "summary.html")
if not os.path.exists(html_path):
SummaryGenerator.generate_html(aligned_data, keyframes, output_dir)
# 创建进度跟踪器
progress = ProgressTracker(1, "PDF报告生成")
# 创建超时处理器
timeout_handler = TimeoutHandler()
timeout_handler.start("PDF报告生成")
try:
logging.info("开始将HTML转换为PDF...")
# 设置PDF配置选项
options = {
'page-size': 'A4',
'margin-top': '0.75in',
'margin-right': '0.75in',
'margin-bottom': '0.75in',
'margin-left': '0.75in',
'encoding': 'UTF-8',
'no-outline': None,
'quiet': ''
}
# 生成PDF文件路径
pdf_path = os.path.join(output_dir, "summary.pdf")
# 使用pdfkit生成PDF
try:
pdfkit.from_file(html_path, pdf_path, options=options)
logging.info(f"PDF报告已生成: {pdf_path}")
# 停止超时处理
timeout_handler.stop()
progress.complete("PDF报告生成完成")
return True
except Exception as e:
logging.error(f"PDF生成失败: {str(e)}")
return False
except Exception as e:
logging.error(f"PDF报告生成过程出错: {str(e)}")
timeout_handler.stop()
return False
@classmethod
def generate_all(cls, aligned_data: list, keyframes: list, output_dir: str):
"""生成所有格式报告"""
try:
# 首先生成HTML报告
html_path = cls.generate_html(aligned_data, keyframes, output_dir)
# 输出明确的报告位置提示
print(f"\n[完成] 报告生成成功!\n")
print(f"HTML报告地址: {os.path.abspath(html_path)}")
# 尝试生成PDF报告
pdf_success = False
try:
# 检查pdfkit模块是否可用
if 'pdfkit' in sys.modules:
pdf_success = cls.generate_pdf(aligned_data, keyframes, output_dir)
else:
logging.info("pdfkit模块不可用跳过PDF生成")
except Exception as e:
logging.error(f"PDF报告生成失败: {str(e)}")
if not pdf_success:
logging.warning("PDF生成功能不可用或生成失败仅生成HTML报告")
return True
except Exception as e:
logging.error(f"报告生成出错: {str(e)}")
logging.error(traceback.format_exc())
# 创建一个极简的报告,以确保用户至少能看到一些结果
try:
fallback_path = os.path.join(os.path.abspath("."), "emergency_report.html")
with open(fallback_path, "w", encoding="utf-8") as f:
f.write(f"""
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>应急报告</title>
</head>
<body>
<h1>视频处理完成但报告生成失败</h1>
<p>处理过程中发生了以下错误:</p>
<pre>{str(e)}</pre>
<p>请查看日志文件以获取更多信息</p>
</body>
</html>
""")
print(f"\n[警告] 正常报告生成失败,已创建应急报告: {fallback_path}\n")
except Exception:
logging.error("创建应急报告也失败了")
return False
# ---------------------- 主流程控制 ----------------------
def main_process(video_path, output_dir=None, progress_callback=None):
try:
logging.info(f"开始处理视频文件: {video_path}")
# 设置输出目录
if output_dir is None:
output_dir = OUTPUT_DIR
# 检查输出目录是否存在,如果不存在则创建
try:
os.makedirs(output_dir, exist_ok=True)
logging.info(f"使用输出目录: {output_dir}")
# 检查输出目录权限
test_file = os.path.join(output_dir, "test_permission.tmp")
with open(test_file, "w") as f:
f.write("test")
os.remove(test_file)
except Exception as e:
logging.error(f"输出目录异常: {str(e)},使用当前目录作为替代")
output_dir = os.path.abspath(".")
os.makedirs(output_dir, exist_ok=True)
logging.info(f"已切换到当前目录: {output_dir}")
# 进度回调函数
def update_progress(progress, message=None):
if progress_callback:
try:
progress_callback(progress, message)
except Exception as e:
logging.error(f"进度回调函数执行失败: {str(e)}")
logging.info(f"处理进度: {progress}% - {message if message else ''}")
# 初始化进度
update_progress(0, "开始处理视频")
# 检查视频文件是否存在
if not os.path.exists(video_path):
error_msg = f"视频文件不存在: {video_path}"
logging.error(error_msg)
update_progress(0, f"错误: {error_msg}")
raise FileNotFoundError(error_msg)
# 检查文件大小
file_size = os.path.getsize(video_path) / (1024 * 1024) # 转换为MB
logging.info(f"视频文件大小: {file_size:.2f}MB")
# 检查文件是否为空
if file_size == 0:
error_msg = "视频文件为空"
logging.error(error_msg)
update_progress(0, f"错误: {error_msg}")
raise ValueError(error_msg)
# 检查文件是否可读
try:
with open(video_path, 'rb') as f:
f.read(1024) # 尝试读取一小块数据
except Exception as e:
error_msg = f"视频文件无法读取: {str(e)}"
logging.error(error_msg)
update_progress(0, f"错误: {error_msg}")
raise IOError(error_msg)
# 检查依赖项
update_progress(5, "检查系统依赖")
if not check_dependencies():
error_msg = "依赖项检查失败"
logging.error(error_msg)
update_progress(5, f"错误: {error_msg}")
raise RuntimeError(f"{error_msg},请检查日志获取详细信息")
update_progress(10, "依赖项检查通过")
# 初始化视频处理器
processor = VideoProcessor()
# 提取关键帧
logging.info("开始提取关键帧...")
update_progress(15, "开始提取关键帧")
try:
keyframes, duration = processor.extract_keyframes(video_path)
if not keyframes:
error_msg = "关键帧提取失败:未能提取到任何关键帧"
logging.error(error_msg)
update_progress(15, f"错误: 未能提取到关键帧")
raise RuntimeError(error_msg)
logging.info(f"成功提取 {len(keyframes)} 个关键帧,视频时长:{duration:.2f}")
update_progress(40, f"已提取 {len(keyframes)} 个关键帧")
except Exception as e:
error_msg = f"关键帧提取过程出错: {str(e)}"
logging.error(error_msg)
try:
logging.error(traceback.format_exc())
except Exception:
logging.error("无法获取详细错误信息traceback模块不可用")
update_progress(15, f"错误: 关键帧提取失败 - {str(e)}")
raise RuntimeError(error_msg)
# 转录音频
logging.info("开始转录音频...")
update_progress(45, "开始转录音频")
try:
segments = VideoProcessor.transcribe_audio(video_path)
if not segments:
logging.warning("音频转录失败:未能识别到任何语音内容")
update_progress(45, "警告: 未识别到语音内容,将生成空文本摘要")
segments = []
else:
logging.info(f"成功转录 {len(segments)} 个音频片段")
update_progress(65, f"已转录 {len(segments)} 个音频片段")
for i, seg in enumerate(segments[:3], 1): # 只记录前三个片段作为示例
logging.debug(f"音频片段 {i}: {seg['text'][:50]}...")
except Exception as e:
error_msg = f"音频转录过程出错: {str(e)}"
logging.error(error_msg)
try:
logging.error(traceback.format_exc())
except Exception:
logging.error("无法获取详细错误信息traceback模块不可用")
update_progress(45, f"错误: 音频转录失败 - {str(e)}")
raise RuntimeError(error_msg)
# 计算时间戳
timestamps = [0] # 添加起始时间戳
for frame_idx, frame in enumerate(keyframes[1:], 1):
timestamps.append(frame_idx * duration / len(keyframes))
# 对齐内容
logging.info("开始对齐内容...")
update_progress(70, "开始对齐内容")
try:
aligned_data = ContentAligner.align_content(video_path, timestamps)
if not aligned_data:
error_msg = "内容对齐失败:未能生成对齐数据"
logging.error(error_msg)
update_progress(70, "错误: 内容对齐失败")
# 创建一个空的对齐数据,以便能继续生成报告
aligned_data = []
for i in range(len(keyframes)):
aligned_data.append({
"page": i,
"start_time": timestamps[i],
"end_time": timestamps[i+1] if i < len(timestamps)-1 else duration,
"text": "未能识别到相关语音内容"
})
logging.info(f"已创建{len(aligned_data)}个空内容对齐数据")
update_progress(75, "使用空内容继续处理")
else:
logging.info(f"成功对齐 {len(aligned_data)} 个内容片段")
update_progress(80, f"已对齐 {len(aligned_data)} 个内容片段")
for i, data in enumerate(aligned_data[:3], 1): # 只记录前三个对齐结果作为示例
logging.debug(f"对齐片段 {i}: {data.get('start_time', 'N/A')}s - {data.get('end_time', 'N/A')}s")
except Exception as e:
error_msg = f"内容对齐过程出错: {str(e)}"
logging.error(error_msg)
try:
logging.error(traceback.format_exc())
except Exception:
logging.error("无法获取详细错误信息traceback模块不可用")
update_progress(70, f"错误: 内容对齐失败 - {str(e)}")
# 创建一个空的对齐数据,以便能继续生成报告
aligned_data = []
for i in range(len(keyframes)):
aligned_data.append({
"page": i,
"start_time": timestamps[i],
"end_time": timestamps[i+1] if i < len(timestamps)-1 else duration,
"text": "未能识别到相关语音内容"
})
logging.info(f"已创建{len(aligned_data)}个空内容对齐数据")
update_progress(75, "使用空内容继续处理")
# 生成总结
logging.info("开始生成总结...")
update_progress(85, "开始生成报告")
try:
if SummaryGenerator.generate_all(aligned_data, keyframes, output_dir):
logging.info(f"总结生成完成,输出目录: {output_dir}")
update_progress(100, "处理完成")
# 检查HTML文件是否存在
html_path = os.path.join(output_dir, "summary.html")
if os.path.exists(html_path):
logging.info(f"报告验证成功: {html_path}")
print(f"\n[成功] 报告生成完成,位置: {os.path.abspath(html_path)}\n")
else:
logging.warning(f"报告文件不存在: {html_path}")
print(f"\n[警告] 处理似乎完成但未找到报告文件,请检查日志\n")
else:
error_msg = "报告生成失败"
logging.error(error_msg)
update_progress(85, f"错误: {error_msg}")
raise RuntimeError(error_msg)
except Exception as e:
error_msg = f"总结生成过程出错: {str(e)}"
logging.error(error_msg)
try:
logging.error(traceback.format_exc())
except Exception:
logging.error("无法获取详细错误信息traceback模块不可用")
update_progress(85, f"错误: 报告生成失败 - {str(e)}")
# 尝试创建一个简单的报告
try:
simple_html = os.path.join(output_dir, "simple_report.html")
with open(simple_html, "w", encoding="utf-8") as f:
f.write(f"""
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>简单报告</title>
<style>
body {{ font-family: Arial, sans-serif; max-width: 1200px; margin: 0 auto; padding: 20px; }}
img {{ max-width: 100%; height: auto; }}
.frame {{ margin-bottom: 30px; border: 1px solid #eee; padding: 15px; }}
</style>
</head>
<body>
<h1>视频简单报告</h1>
<p>完整报告生成失败这是一个简化版本</p>
""")
# 添加关键帧
for i, frame in enumerate(keyframes):
# 保存图片
img_path = os.path.join(output_dir, f"frame_{i}.jpg")
frame.save(img_path)
# 添加到HTML
f.write(f"""
<div class="frame">
<h2> {i+1} </h2>
<img src="frame_{i}.jpg" alt="关键帧 {i+1}">
</div>
""")
f.write("</body></html>")
logging.info(f"简单报告已生成: {simple_html}")
print(f"\n[恢复] 创建了简单报告: {os.path.abspath(simple_html)}\n")
except Exception as inner_e:
logging.error(f"简单报告生成也失败了: {str(inner_e)}")
raise RuntimeError(error_msg)
logging.info("所有处理步骤已完成")
return True
except Exception as e:
logging.error(f"处理过程中发生错误: {str(e)}")
logging.error("详细错误信息:")
try:
logging.error(traceback.format_exc())
except Exception:
logging.error("无法获取详细错误信息traceback模块不可用")
if progress_callback:
try:
progress_callback(0, f"处理失败: {str(e)}")
except:
pass
print(f"\n[错误] 处理失败: {str(e)}\n")
return False
if __name__ == "__main__":
try:
if len(sys.argv) < 2:
print("使用方法: python 毕设.py <视频文件路径>")
sys.exit(1)
video_path = sys.argv[1]
if main_process(video_path):
print("[完成] 处理成功")
sys.exit(0)
else:
print("[错误] 处理失败,请查看日志文件了解详情")
sys.exit(1)
except KeyboardInterrupt:
print("\n[中断] 用户中断了处理")
sys.exit(130)
except Exception as e:
print(f"[错误] 程序执行过程中出现未处理的异常: {str(e)}")
try:
traceback.print_exc()
except Exception:
print("无法打印详细错误信息traceback模块不可用")
sys.exit(1)