PPT/3.0/毕设.py
2025-04-24 12:43:44 +08:00

576 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import cv2
import numpy as np
from skimage.metrics import structural_similarity as ssim
from moviepy.editor import VideoFileClip
from PIL import Image
import os
from scipy.signal import find_peaks
import torch
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import whisper
from collections import defaultdict
import re
from reportlab.lib.pagesizes import A4
from reportlab.pdfgen import canvas
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.platypus import Paragraph, Image as RLImage
from reportlab.lib.units import inch
import threading
import pdfkit
from jinja2 import Environment
import io
import base64
# 全局配置
SSIM_THRESHOLD = 0.85 # 关键帧去重阈值
FRAME_INTERVAL = 2 # 抽帧间隔(秒)
TRANSITION_WORDS = ["接下来", "下一页", "如图"] # 过渡词过滤列表
WKHTMLTOPDF_PATH = r"D:\wkhtmltopdf\bin\wkhtmltopdf.exe"
class PPTSummarizer:
def __init__(self, video_path, output_dir=None):
self.video_path = video_path
# 如果没有指定输出目录,则使用默认目录
if output_dir is None:
# 使用视频文件名作为输出目录名
video_name = os.path.splitext(os.path.basename(video_path))[0]
self.output_dir = os.path.join("output", video_name)
else:
self.output_dir = output_dir
self.frames = []
self.key_frames = []
self.text_content = []
self.frame_timestamps = []
self.aligned_data = []
self.processing_complete = threading.Event()
# 创建输出目录
if not os.path.exists(self.output_dir):
os.makedirs(self.output_dir)
# 初始化whisper模型
self.whisper_model = whisper.load_model("tiny", device="cpu")
def extract_frames(self):
"""提取视频帧"""
try:
# 使用VideoFileClip替代cv2
video = VideoFileClip(self.video_path)
duration = video.duration
fps = video.fps
# 计算采样间隔
sample_interval = max(1 / fps, FRAME_INTERVAL)
timestamps = np.arange(0, duration, sample_interval)
print(f"开始提取帧,视频时长:{duration:.2f}FPS{fps}")
# 提取帧
for t in timestamps:
try:
frame = video.get_frame(t)
# 转换为BGR格式OpenCV格式
frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
self.frames.append(frame_bgr)
self.frame_timestamps.append(t)
except Exception as e:
print(f"提取帧 {t}s 时出错: {str(e)}")
continue
video.close()
print(f"成功提取 {len(self.frames)}")
return fps
except Exception as e:
print(f"视频处理出错: {str(e)}")
return 0
def process_audio(self):
"""处理音频"""
try:
print("开始语音识别...")
# 使用更大的模型以提高识别准确度
result = self.whisper_model.transcribe(
self.video_path,
fp16=False,
language="zh",
task="transcribe",
verbose=True
)
segments = result.get("segments", [])
print(f"语音识别完成,共识别出 {len(segments)} 个片段")
# 打印识别结果
for i, seg in enumerate(segments):
try:
print(f"片段 {i + 1}: {seg['start']:.1f}s - {seg['end']:.1f}s: {seg['text']}")
except UnicodeEncodeError:
print(f"片段 {i + 1}: {seg['start']:.1f}s - {seg['end']:.1f}s: [文本包含特殊字符]")
# 生成页面时间段
intervals = []
for i in range(len(self.frame_timestamps)):
start = self.frame_timestamps[i]
end = self.frame_timestamps[i + 1] if i < len(self.frame_timestamps) - 1 else self.frame_timestamps[
-1] + 1
intervals.append((start, end))
print(f"页面 {i + 1} 时间段: {start:.1f}s - {end:.1f}s")
# 改进的对齐逻辑
page_texts = defaultdict(list)
page_segments = defaultdict(list) # 存储每个页面对应的片段
all_texts = [] # 存储所有文本片段
used_texts = set() # 跟踪已使用的文本
# 第一步:收集所有文本片段
for seg in segments:
try:
seg_start = seg["start"]
seg_end = seg["end"]
seg_text = seg["text"].strip()
all_texts.append((seg_start, seg_end, seg_text))
except Exception as e:
print(f"处理片段时出错: {str(e)}")
continue
# 第二步:将文本分配到各个页面
for start, end, text in all_texts:
try:
# 找到与当前片段时间重叠的所有页面
overlapping_pages = []
for page_idx, (page_start, page_end) in enumerate(intervals):
if (start <= page_end and end >= page_start):
overlapping_pages.append((page_idx, page_start, page_end))
# 如果找到重叠页面,将文本添加到最合适的页面
if overlapping_pages:
# 计算每个页面的重叠时间
page_overlaps = []
for page_idx, page_start, page_end in overlapping_pages:
overlap_start = max(start, page_start)
overlap_end = min(end, page_end)
overlap_duration = overlap_end - overlap_start
page_overlaps.append((page_idx, overlap_duration))
# 按重叠时间排序
page_overlaps.sort(key=lambda x: x[1], reverse=True)
# 将文本添加到重叠时间最长的页面
best_page = page_overlaps[0][0]
if text not in used_texts: # 确保文本未被使用
page_texts[best_page].append(text)
page_segments[best_page].append((start, end, text))
used_texts.add(text)
print(f"将文本 '{text}' 添加到页面 {best_page + 1}")
except Exception as e:
print(f"分配文本时出错: {str(e)}")
continue
# 第三步:优化每个页面的文本
self.aligned_data = []
for idx in range(len(intervals)):
try:
# 获取当前页面的所有片段
segments = page_segments[idx]
# 按时间排序
segments.sort(key=lambda x: x[0])
# 合并相邻的相似文本
merged_texts = []
current_text = ""
last_end_time = 0
for start, end, text in segments:
# 如果当前文本为空,直接添加
if not current_text:
current_text = text
last_end_time = end
continue
# 计算时间间隔
time_gap = start - last_end_time
# 如果时间间隔小于3秒合并文本
if time_gap < 3.0: # 增加时间间隔阈值
current_text += " " + text
else:
merged_texts.append(current_text)
current_text = text
last_end_time = end
# 添加最后一个文本
if current_text:
merged_texts.append(current_text)
# 合并所有文本
final_text = " ".join(merged_texts)
# 如果当前页面文本为空,尝试从前一页面获取
if not final_text and idx > 0:
final_text = self.aligned_data[idx - 1]["text"]
# 优化文本
optimized_text = self.optimize_text(final_text)
if optimized_text:
print(f"页面 {idx + 1} 的优化后文本内容: {optimized_text}")
self.aligned_data.append({
"page": idx,
"start_time": intervals[idx][0],
"end_time": intervals[idx][1],
"text": optimized_text
})
except Exception as e:
print(f"处理页面 {idx + 1} 时出错: {str(e)}")
# 添加空数据
self.aligned_data.append({
"page": idx,
"start_time": intervals[idx][0],
"end_time": intervals[idx][1],
"text": ""
})
# 第四步:确保所有文本都被包含
# 检查是否有遗漏的文本片段
for start, end, text in all_texts:
try:
if text not in used_texts:
# 找到最后一个非空页面
last_non_empty_page = -1
for i in range(len(self.aligned_data) - 1, -1, -1):
if self.aligned_data[i]["text"]:
last_non_empty_page = i
break
if last_non_empty_page >= 0:
self.aligned_data[last_non_empty_page]["text"] += " " + text
print(f"将遗漏的文本 '{text}' 添加到页面 {last_non_empty_page + 1}")
except Exception as e:
print(f"处理遗漏文本时出错: {str(e)}")
continue
# 保存对齐数据到文件
try:
with open(os.path.join(self.output_dir, 'aligned_data.txt'), 'w', encoding='utf-8') as f:
for data in self.aligned_data:
f.write(f"页面 {data['page'] + 1}:\n")
f.write(f"时间: {data['start_time']:.1f}s - {data['end_time']:.1f}s\n")
f.write(f"文本: {data['text']}\n\n")
print("对齐数据已保存到文件")
except Exception as e:
print(f"保存对齐数据时出错: {str(e)}")
except Exception as e:
print(f"音频处理出错: {str(e)}")
self.aligned_data = []
def process_frames(self):
"""处理视频帧"""
try:
print("开始处理视频帧...")
# 计算帧间相似度
similarities = []
for i in range(len(self.frames) - 1):
try:
frame1 = cv2.cvtColor(self.frames[i], cv2.COLOR_BGR2GRAY)
frame2 = cv2.cvtColor(self.frames[i + 1], cv2.COLOR_BGR2GRAY)
similarity = ssim(frame1, frame2)
similarities.append(similarity)
except Exception as e:
print(f"计算帧 {i} 相似度时出错: {str(e)}")
similarities.append(1.0) # 出错时假设帧相似
# 使用自适应阈值
mean_similarity = np.mean(similarities)
std_similarity = np.std(similarities)
adaptive_threshold = mean_similarity - 2 * std_similarity
# 使用峰值检测找到关键帧
peaks, _ = find_peaks([1 - s for s in similarities],
height=1 - adaptive_threshold,
distance=int(len(similarities) / 20)) # 最小距离
# 保存关键帧
for peak in peaks:
if not self.is_blank_frame(self.frames[peak]):
self.key_frames.append(self.frames[peak])
print(f"找到 {len(self.key_frames)} 个关键帧")
except Exception as e:
print(f"处理视频帧时出错: {str(e)}")
def is_blank_frame(self, frame, threshold=30):
"""检测是否为空白帧"""
try:
# 转换为灰度图
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# 计算图像统计特征
mean = np.mean(gray)
std_dev = np.std(gray)
# 检查是否为纯黑或纯白
is_black = mean < 10 and std_dev < 5
is_white = mean > 245 and std_dev < 5
# 检查是否有足够的细节
has_detail = std_dev > threshold
return is_black or is_white or not has_detail
except Exception as e:
print(f"检查空白帧时出错: {str(e)}")
return True
def optimize_text(self, text):
"""文本优化"""
try:
if not text:
return ""
# 过滤过渡词
sentences = re.split(r'[。!?]', text)
filtered = []
seen = set()
for sent in sentences:
sent = sent.strip()
if (len(sent) >= 10
and not any(word in sent for word in TRANSITION_WORDS)
and sent not in seen):
filtered.append(sent)
seen.add(sent)
result = ''.join(filtered) + '' if filtered else ""
if result:
print(f"优化后的文本: {result}")
return result
except Exception as e:
print(f"文本优化时出错: {str(e)}")
return text
def save_results(self):
"""保存结果"""
try:
# 检查输出目录权限
if not os.access(self.output_dir, os.W_OK):
print(f"错误:没有写入权限: {self.output_dir}")
return
# 生成PDF文档
pdf_path = os.path.join(self.output_dir, 'summary.pdf')
# 创建临时HTML文件
temp_html = os.path.join(self.output_dir, "_temp_pdf.html")
temp_img_dir = os.path.join(self.output_dir, "_temp_pdf_images")
os.makedirs(temp_img_dir, exist_ok=True)
try:
# 使用绝对路径
abs_temp_img_dir = os.path.abspath(temp_img_dir)
html_content = """
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<style>
@page {
margin: 20px;
size: A4;
}
body {
font-family: "Microsoft YaHei", "SimSun", sans-serif;
line-height: 1.6;
margin: 0;
padding: 20px;
}
.page {
page-break-inside: avoid;
margin-bottom: 30px;
padding: 20px;
background-color: white;
}
img {
max-width: 100%;
height: auto;
display: block;
margin: 10px auto;
}
.timestamp {
color: #666;
font-size: 12pt;
margin: 10px 0;
}
.content {
font-size: 14pt;
line-height: 1.6;
margin: 15px 0;
}
h1 {
text-align: center;
color: #333;
margin-bottom: 30px;
}
h2 {
color: #444;
margin: 15px 0;
}
</style>
</head>
<body>
<h1>PPT视频结构化摘要</h1>
{% for page in pages %}
<div class="page">
<h2>页面 {{ page.num }}</h2>
<div class="timestamp">{{ page.time }}</div>
<img src="{{ page.image_path }}" alt="页面截图">
<div class="content">{{ page.text }}</div>
</div>
{% endfor %}
</body>
</html>
"""
pages_data = []
for idx, frame in enumerate(self.key_frames):
try:
img_filename = f"page_{idx}.jpg"
img_path = os.path.join(abs_temp_img_dir, img_filename)
# 将numpy数组转换为PIL Image对象
if len(frame.shape) == 3 and frame.shape[2] == 3:
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
else:
frame_rgb = frame
# 创建PIL Image对象
img = Image.fromarray(frame_rgb)
# 调整图片大小
max_width = 800
if img.width > max_width:
ratio = max_width / img.width
new_height = int(img.height * ratio)
img = img.resize((max_width, new_height), Image.Resampling.LANCZOS)
# 保存图片
img.save(img_path, format='JPEG', quality=85, optimize=True)
# 获取从开始到当前帧的所有文本
current_time = self.aligned_data[idx]['end_time']
# 收集从开始到当前时间点的所有文本
texts = []
for data in self.aligned_data:
if data['end_time'] <= current_time:
if data['text']:
texts.append(data['text'])
# 合并文本
combined_text = " ".join(texts)
# 如果只有一帧关键帧,显示整个视频的所有文本
if len(self.key_frames) == 1:
all_texts = []
for data in self.aligned_data:
if data['text']:
all_texts.append(data['text'])
combined_text = " ".join(all_texts)
# 添加file://前缀到图片路径
img_path_with_prefix = f"file:///{img_path.replace(os.sep, '/')}"
pages_data.append({
"num": idx + 1,
"time": f"{self.aligned_data[idx]['start_time']:.1f}s - {self.aligned_data[idx]['end_time']:.1f}s",
"image_path": img_path_with_prefix,
"text": combined_text
})
except Exception as e:
print(f"处理页面 {idx + 1} 时出错: {str(e)}")
continue
# 生成HTML文件
env = Environment()
template = env.from_string(html_content)
with open(temp_html, "w", encoding="utf-8") as f:
f.write(template.render(pages=pages_data))
# 设置PDF生成选项
options = {
"enable-local-file-access": "",
"encoding": "UTF-8",
"margin-top": "15mm",
"margin-bottom": "15mm",
"margin-left": "15mm",
"margin-right": "15mm",
"quiet": "",
"print-media-type": "",
"page-size": "A4",
"orientation": "Portrait"
}
config = pdfkit.configuration(wkhtmltopdf=WKHTMLTOPDF_PATH)
# 生成PDF
pdfkit.from_file(
temp_html,
pdf_path,
configuration=config,
options=options
)
print(f"PDF已保存到: {pdf_path}")
finally:
# 清理临时文件
if os.path.exists(temp_html):
os.remove(temp_html)
if os.path.exists(temp_img_dir):
for f in os.listdir(temp_img_dir):
os.remove(os.path.join(temp_img_dir, f))
os.rmdir(temp_img_dir)
except Exception as e:
print(f"保存结果时出错: {str(e)}")
def process(self):
"""处理视频并生成摘要"""
try:
start_time = time.time()
print("开始处理视频...")
# 提取视频帧
self.extract_frames()
# 创建线程池
with ThreadPoolExecutor(max_workers=2) as executor:
# 提交音频处理和帧处理任务
audio_future = executor.submit(self.process_audio)
frames_future = executor.submit(self.process_frames)
# 等待两个任务完成
audio_future.result()
frames_future.result()
print("生成摘要...")
self.save_results()
end_time = time.time()
print(f"处理完成!总耗时:{end_time - start_time:.2f}")
except Exception as e:
print(f"处理过程出错: {str(e)}")
raise # 重新抛出异常以便调试
if __name__ == "__main__":
# 使用示例
video_path = "D:/python项目文件/1/input3.mp4" # 替换为实际的视频路径
output_dir = "custom_output" # 自定义输出路径
summarizer = PPTSummarizer(video_path, output_dir)
summarizer.process()