上传文件至 3.0
This commit is contained in:
parent
71c468026f
commit
463a2ead70
BIN
3.0/input3.mp4
Normal file
BIN
3.0/input3.mp4
Normal file
Binary file not shown.
BIN
3.0/summary.pdf
Normal file
BIN
3.0/summary.pdf
Normal file
Binary file not shown.
576
3.0/毕设.py
Normal file
576
3.0/毕设.py
Normal file
@ -0,0 +1,576 @@
|
||||
import cv2
|
||||
import numpy as np
|
||||
from skimage.metrics import structural_similarity as ssim
|
||||
from moviepy.editor import VideoFileClip
|
||||
from PIL import Image
|
||||
import os
|
||||
from scipy.signal import find_peaks
|
||||
import torch
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
import time
|
||||
import whisper
|
||||
from collections import defaultdict
|
||||
import re
|
||||
from reportlab.lib.pagesizes import A4
|
||||
from reportlab.pdfgen import canvas
|
||||
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
|
||||
from reportlab.platypus import Paragraph, Image as RLImage
|
||||
from reportlab.lib.units import inch
|
||||
import threading
|
||||
import pdfkit
|
||||
from jinja2 import Environment
|
||||
import io
|
||||
import base64
|
||||
|
||||
# 全局配置
|
||||
SSIM_THRESHOLD = 0.85 # 关键帧去重阈值
|
||||
FRAME_INTERVAL = 2 # 抽帧间隔(秒)
|
||||
TRANSITION_WORDS = ["接下来", "下一页", "如图"] # 过渡词过滤列表
|
||||
WKHTMLTOPDF_PATH = r"D:\wkhtmltopdf\bin\wkhtmltopdf.exe"
|
||||
|
||||
|
||||
class PPTSummarizer:
|
||||
def __init__(self, video_path, output_dir=None):
|
||||
self.video_path = video_path
|
||||
# 如果没有指定输出目录,则使用默认目录
|
||||
if output_dir is None:
|
||||
# 使用视频文件名作为输出目录名
|
||||
video_name = os.path.splitext(os.path.basename(video_path))[0]
|
||||
self.output_dir = os.path.join("output", video_name)
|
||||
else:
|
||||
self.output_dir = output_dir
|
||||
|
||||
self.frames = []
|
||||
self.key_frames = []
|
||||
self.text_content = []
|
||||
self.frame_timestamps = []
|
||||
self.aligned_data = []
|
||||
self.processing_complete = threading.Event()
|
||||
|
||||
# 创建输出目录
|
||||
if not os.path.exists(self.output_dir):
|
||||
os.makedirs(self.output_dir)
|
||||
|
||||
# 初始化whisper模型
|
||||
self.whisper_model = whisper.load_model("tiny", device="cpu")
|
||||
|
||||
def extract_frames(self):
|
||||
"""提取视频帧"""
|
||||
try:
|
||||
# 使用VideoFileClip替代cv2
|
||||
video = VideoFileClip(self.video_path)
|
||||
duration = video.duration
|
||||
fps = video.fps
|
||||
|
||||
# 计算采样间隔
|
||||
sample_interval = max(1 / fps, FRAME_INTERVAL)
|
||||
timestamps = np.arange(0, duration, sample_interval)
|
||||
|
||||
print(f"开始提取帧,视频时长:{duration:.2f}秒,FPS:{fps}")
|
||||
|
||||
# 提取帧
|
||||
for t in timestamps:
|
||||
try:
|
||||
frame = video.get_frame(t)
|
||||
# 转换为BGR格式(OpenCV格式)
|
||||
frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
|
||||
self.frames.append(frame_bgr)
|
||||
self.frame_timestamps.append(t)
|
||||
except Exception as e:
|
||||
print(f"提取帧 {t}s 时出错: {str(e)}")
|
||||
continue
|
||||
|
||||
video.close()
|
||||
print(f"成功提取 {len(self.frames)} 帧")
|
||||
return fps
|
||||
|
||||
except Exception as e:
|
||||
print(f"视频处理出错: {str(e)}")
|
||||
return 0
|
||||
|
||||
def process_audio(self):
|
||||
"""处理音频"""
|
||||
try:
|
||||
print("开始语音识别...")
|
||||
# 使用更大的模型以提高识别准确度
|
||||
result = self.whisper_model.transcribe(
|
||||
self.video_path,
|
||||
fp16=False,
|
||||
language="zh",
|
||||
task="transcribe",
|
||||
verbose=True
|
||||
)
|
||||
segments = result.get("segments", [])
|
||||
print(f"语音识别完成,共识别出 {len(segments)} 个片段")
|
||||
|
||||
# 打印识别结果
|
||||
for i, seg in enumerate(segments):
|
||||
try:
|
||||
print(f"片段 {i + 1}: {seg['start']:.1f}s - {seg['end']:.1f}s: {seg['text']}")
|
||||
except UnicodeEncodeError:
|
||||
print(f"片段 {i + 1}: {seg['start']:.1f}s - {seg['end']:.1f}s: [文本包含特殊字符]")
|
||||
|
||||
# 生成页面时间段
|
||||
intervals = []
|
||||
for i in range(len(self.frame_timestamps)):
|
||||
start = self.frame_timestamps[i]
|
||||
end = self.frame_timestamps[i + 1] if i < len(self.frame_timestamps) - 1 else self.frame_timestamps[
|
||||
-1] + 1
|
||||
intervals.append((start, end))
|
||||
print(f"页面 {i + 1} 时间段: {start:.1f}s - {end:.1f}s")
|
||||
|
||||
# 改进的对齐逻辑
|
||||
page_texts = defaultdict(list)
|
||||
page_segments = defaultdict(list) # 存储每个页面对应的片段
|
||||
all_texts = [] # 存储所有文本片段
|
||||
used_texts = set() # 跟踪已使用的文本
|
||||
|
||||
# 第一步:收集所有文本片段
|
||||
for seg in segments:
|
||||
try:
|
||||
seg_start = seg["start"]
|
||||
seg_end = seg["end"]
|
||||
seg_text = seg["text"].strip()
|
||||
all_texts.append((seg_start, seg_end, seg_text))
|
||||
except Exception as e:
|
||||
print(f"处理片段时出错: {str(e)}")
|
||||
continue
|
||||
|
||||
# 第二步:将文本分配到各个页面
|
||||
for start, end, text in all_texts:
|
||||
try:
|
||||
# 找到与当前片段时间重叠的所有页面
|
||||
overlapping_pages = []
|
||||
for page_idx, (page_start, page_end) in enumerate(intervals):
|
||||
if (start <= page_end and end >= page_start):
|
||||
overlapping_pages.append((page_idx, page_start, page_end))
|
||||
|
||||
# 如果找到重叠页面,将文本添加到最合适的页面
|
||||
if overlapping_pages:
|
||||
# 计算每个页面的重叠时间
|
||||
page_overlaps = []
|
||||
for page_idx, page_start, page_end in overlapping_pages:
|
||||
overlap_start = max(start, page_start)
|
||||
overlap_end = min(end, page_end)
|
||||
overlap_duration = overlap_end - overlap_start
|
||||
page_overlaps.append((page_idx, overlap_duration))
|
||||
|
||||
# 按重叠时间排序
|
||||
page_overlaps.sort(key=lambda x: x[1], reverse=True)
|
||||
|
||||
# 将文本添加到重叠时间最长的页面
|
||||
best_page = page_overlaps[0][0]
|
||||
if text not in used_texts: # 确保文本未被使用
|
||||
page_texts[best_page].append(text)
|
||||
page_segments[best_page].append((start, end, text))
|
||||
used_texts.add(text)
|
||||
print(f"将文本 '{text}' 添加到页面 {best_page + 1}")
|
||||
except Exception as e:
|
||||
print(f"分配文本时出错: {str(e)}")
|
||||
continue
|
||||
|
||||
# 第三步:优化每个页面的文本
|
||||
self.aligned_data = []
|
||||
for idx in range(len(intervals)):
|
||||
try:
|
||||
# 获取当前页面的所有片段
|
||||
segments = page_segments[idx]
|
||||
|
||||
# 按时间排序
|
||||
segments.sort(key=lambda x: x[0])
|
||||
|
||||
# 合并相邻的相似文本
|
||||
merged_texts = []
|
||||
current_text = ""
|
||||
last_end_time = 0
|
||||
|
||||
for start, end, text in segments:
|
||||
# 如果当前文本为空,直接添加
|
||||
if not current_text:
|
||||
current_text = text
|
||||
last_end_time = end
|
||||
continue
|
||||
|
||||
# 计算时间间隔
|
||||
time_gap = start - last_end_time
|
||||
|
||||
# 如果时间间隔小于3秒,合并文本
|
||||
if time_gap < 3.0: # 增加时间间隔阈值
|
||||
current_text += " " + text
|
||||
else:
|
||||
merged_texts.append(current_text)
|
||||
current_text = text
|
||||
|
||||
last_end_time = end
|
||||
|
||||
# 添加最后一个文本
|
||||
if current_text:
|
||||
merged_texts.append(current_text)
|
||||
|
||||
# 合并所有文本
|
||||
final_text = " ".join(merged_texts)
|
||||
|
||||
# 如果当前页面文本为空,尝试从前一页面获取
|
||||
if not final_text and idx > 0:
|
||||
final_text = self.aligned_data[idx - 1]["text"]
|
||||
|
||||
# 优化文本
|
||||
optimized_text = self.optimize_text(final_text)
|
||||
|
||||
if optimized_text:
|
||||
print(f"页面 {idx + 1} 的优化后文本内容: {optimized_text}")
|
||||
|
||||
self.aligned_data.append({
|
||||
"page": idx,
|
||||
"start_time": intervals[idx][0],
|
||||
"end_time": intervals[idx][1],
|
||||
"text": optimized_text
|
||||
})
|
||||
except Exception as e:
|
||||
print(f"处理页面 {idx + 1} 时出错: {str(e)}")
|
||||
# 添加空数据
|
||||
self.aligned_data.append({
|
||||
"page": idx,
|
||||
"start_time": intervals[idx][0],
|
||||
"end_time": intervals[idx][1],
|
||||
"text": ""
|
||||
})
|
||||
|
||||
# 第四步:确保所有文本都被包含
|
||||
# 检查是否有遗漏的文本片段
|
||||
for start, end, text in all_texts:
|
||||
try:
|
||||
if text not in used_texts:
|
||||
# 找到最后一个非空页面
|
||||
last_non_empty_page = -1
|
||||
for i in range(len(self.aligned_data) - 1, -1, -1):
|
||||
if self.aligned_data[i]["text"]:
|
||||
last_non_empty_page = i
|
||||
break
|
||||
|
||||
if last_non_empty_page >= 0:
|
||||
self.aligned_data[last_non_empty_page]["text"] += " " + text
|
||||
print(f"将遗漏的文本 '{text}' 添加到页面 {last_non_empty_page + 1}")
|
||||
except Exception as e:
|
||||
print(f"处理遗漏文本时出错: {str(e)}")
|
||||
continue
|
||||
|
||||
# 保存对齐数据到文件
|
||||
try:
|
||||
with open(os.path.join(self.output_dir, 'aligned_data.txt'), 'w', encoding='utf-8') as f:
|
||||
for data in self.aligned_data:
|
||||
f.write(f"页面 {data['page'] + 1}:\n")
|
||||
f.write(f"时间: {data['start_time']:.1f}s - {data['end_time']:.1f}s\n")
|
||||
f.write(f"文本: {data['text']}\n\n")
|
||||
print("对齐数据已保存到文件")
|
||||
except Exception as e:
|
||||
print(f"保存对齐数据时出错: {str(e)}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"音频处理出错: {str(e)}")
|
||||
self.aligned_data = []
|
||||
|
||||
def process_frames(self):
|
||||
"""处理视频帧"""
|
||||
try:
|
||||
print("开始处理视频帧...")
|
||||
# 计算帧间相似度
|
||||
similarities = []
|
||||
for i in range(len(self.frames) - 1):
|
||||
try:
|
||||
frame1 = cv2.cvtColor(self.frames[i], cv2.COLOR_BGR2GRAY)
|
||||
frame2 = cv2.cvtColor(self.frames[i + 1], cv2.COLOR_BGR2GRAY)
|
||||
similarity = ssim(frame1, frame2)
|
||||
similarities.append(similarity)
|
||||
except Exception as e:
|
||||
print(f"计算帧 {i} 相似度时出错: {str(e)}")
|
||||
similarities.append(1.0) # 出错时假设帧相似
|
||||
|
||||
# 使用自适应阈值
|
||||
mean_similarity = np.mean(similarities)
|
||||
std_similarity = np.std(similarities)
|
||||
adaptive_threshold = mean_similarity - 2 * std_similarity
|
||||
|
||||
# 使用峰值检测找到关键帧
|
||||
peaks, _ = find_peaks([1 - s for s in similarities],
|
||||
height=1 - adaptive_threshold,
|
||||
distance=int(len(similarities) / 20)) # 最小距离
|
||||
|
||||
# 保存关键帧
|
||||
for peak in peaks:
|
||||
if not self.is_blank_frame(self.frames[peak]):
|
||||
self.key_frames.append(self.frames[peak])
|
||||
|
||||
print(f"找到 {len(self.key_frames)} 个关键帧")
|
||||
|
||||
except Exception as e:
|
||||
print(f"处理视频帧时出错: {str(e)}")
|
||||
|
||||
def is_blank_frame(self, frame, threshold=30):
|
||||
"""检测是否为空白帧"""
|
||||
try:
|
||||
# 转换为灰度图
|
||||
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
||||
|
||||
# 计算图像统计特征
|
||||
mean = np.mean(gray)
|
||||
std_dev = np.std(gray)
|
||||
|
||||
# 检查是否为纯黑或纯白
|
||||
is_black = mean < 10 and std_dev < 5
|
||||
is_white = mean > 245 and std_dev < 5
|
||||
|
||||
# 检查是否有足够的细节
|
||||
has_detail = std_dev > threshold
|
||||
|
||||
return is_black or is_white or not has_detail
|
||||
except Exception as e:
|
||||
print(f"检查空白帧时出错: {str(e)}")
|
||||
return True
|
||||
|
||||
def optimize_text(self, text):
|
||||
"""文本优化"""
|
||||
try:
|
||||
if not text:
|
||||
return ""
|
||||
|
||||
# 过滤过渡词
|
||||
sentences = re.split(r'[。!?]', text)
|
||||
filtered = []
|
||||
seen = set()
|
||||
for sent in sentences:
|
||||
sent = sent.strip()
|
||||
if (len(sent) >= 10
|
||||
and not any(word in sent for word in TRANSITION_WORDS)
|
||||
and sent not in seen):
|
||||
filtered.append(sent)
|
||||
seen.add(sent)
|
||||
|
||||
result = '。'.join(filtered) + '。' if filtered else ""
|
||||
if result:
|
||||
print(f"优化后的文本: {result}")
|
||||
return result
|
||||
except Exception as e:
|
||||
print(f"文本优化时出错: {str(e)}")
|
||||
return text
|
||||
|
||||
def save_results(self):
|
||||
"""保存结果"""
|
||||
try:
|
||||
# 检查输出目录权限
|
||||
if not os.access(self.output_dir, os.W_OK):
|
||||
print(f"错误:没有写入权限: {self.output_dir}")
|
||||
return
|
||||
|
||||
# 生成PDF文档
|
||||
pdf_path = os.path.join(self.output_dir, 'summary.pdf')
|
||||
|
||||
# 创建临时HTML文件
|
||||
temp_html = os.path.join(self.output_dir, "_temp_pdf.html")
|
||||
temp_img_dir = os.path.join(self.output_dir, "_temp_pdf_images")
|
||||
os.makedirs(temp_img_dir, exist_ok=True)
|
||||
|
||||
try:
|
||||
# 使用绝对路径
|
||||
abs_temp_img_dir = os.path.abspath(temp_img_dir)
|
||||
|
||||
html_content = """
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<style>
|
||||
@page {
|
||||
margin: 20px;
|
||||
size: A4;
|
||||
}
|
||||
body {
|
||||
font-family: "Microsoft YaHei", "SimSun", sans-serif;
|
||||
line-height: 1.6;
|
||||
margin: 0;
|
||||
padding: 20px;
|
||||
}
|
||||
.page {
|
||||
page-break-inside: avoid;
|
||||
margin-bottom: 30px;
|
||||
padding: 20px;
|
||||
background-color: white;
|
||||
}
|
||||
img {
|
||||
max-width: 100%;
|
||||
height: auto;
|
||||
display: block;
|
||||
margin: 10px auto;
|
||||
}
|
||||
.timestamp {
|
||||
color: #666;
|
||||
font-size: 12pt;
|
||||
margin: 10px 0;
|
||||
}
|
||||
.content {
|
||||
font-size: 14pt;
|
||||
line-height: 1.6;
|
||||
margin: 15px 0;
|
||||
}
|
||||
h1 {
|
||||
text-align: center;
|
||||
color: #333;
|
||||
margin-bottom: 30px;
|
||||
}
|
||||
h2 {
|
||||
color: #444;
|
||||
margin: 15px 0;
|
||||
}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<h1>PPT视频结构化摘要</h1>
|
||||
{% for page in pages %}
|
||||
<div class="page">
|
||||
<h2>页面 {{ page.num }}</h2>
|
||||
<div class="timestamp">{{ page.time }}</div>
|
||||
<img src="{{ page.image_path }}" alt="页面截图">
|
||||
<div class="content">{{ page.text }}</div>
|
||||
</div>
|
||||
{% endfor %}
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
|
||||
pages_data = []
|
||||
for idx, frame in enumerate(self.key_frames):
|
||||
try:
|
||||
img_filename = f"page_{idx}.jpg"
|
||||
img_path = os.path.join(abs_temp_img_dir, img_filename)
|
||||
|
||||
# 将numpy数组转换为PIL Image对象
|
||||
if len(frame.shape) == 3 and frame.shape[2] == 3:
|
||||
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
||||
else:
|
||||
frame_rgb = frame
|
||||
|
||||
# 创建PIL Image对象
|
||||
img = Image.fromarray(frame_rgb)
|
||||
|
||||
# 调整图片大小
|
||||
max_width = 800
|
||||
if img.width > max_width:
|
||||
ratio = max_width / img.width
|
||||
new_height = int(img.height * ratio)
|
||||
img = img.resize((max_width, new_height), Image.Resampling.LANCZOS)
|
||||
|
||||
# 保存图片
|
||||
img.save(img_path, format='JPEG', quality=85, optimize=True)
|
||||
|
||||
# 获取从开始到当前帧的所有文本
|
||||
current_time = self.aligned_data[idx]['end_time']
|
||||
|
||||
# 收集从开始到当前时间点的所有文本
|
||||
texts = []
|
||||
for data in self.aligned_data:
|
||||
if data['end_time'] <= current_time:
|
||||
if data['text']:
|
||||
texts.append(data['text'])
|
||||
|
||||
# 合并文本
|
||||
combined_text = " ".join(texts)
|
||||
|
||||
# 如果只有一帧关键帧,显示整个视频的所有文本
|
||||
if len(self.key_frames) == 1:
|
||||
all_texts = []
|
||||
for data in self.aligned_data:
|
||||
if data['text']:
|
||||
all_texts.append(data['text'])
|
||||
combined_text = " ".join(all_texts)
|
||||
|
||||
# 添加file://前缀到图片路径
|
||||
img_path_with_prefix = f"file:///{img_path.replace(os.sep, '/')}"
|
||||
|
||||
pages_data.append({
|
||||
"num": idx + 1,
|
||||
"time": f"{self.aligned_data[idx]['start_time']:.1f}s - {self.aligned_data[idx]['end_time']:.1f}s",
|
||||
"image_path": img_path_with_prefix,
|
||||
"text": combined_text
|
||||
})
|
||||
except Exception as e:
|
||||
print(f"处理页面 {idx + 1} 时出错: {str(e)}")
|
||||
continue
|
||||
|
||||
# 生成HTML文件
|
||||
env = Environment()
|
||||
template = env.from_string(html_content)
|
||||
with open(temp_html, "w", encoding="utf-8") as f:
|
||||
f.write(template.render(pages=pages_data))
|
||||
|
||||
# 设置PDF生成选项
|
||||
options = {
|
||||
"enable-local-file-access": "",
|
||||
"encoding": "UTF-8",
|
||||
"margin-top": "15mm",
|
||||
"margin-bottom": "15mm",
|
||||
"margin-left": "15mm",
|
||||
"margin-right": "15mm",
|
||||
"quiet": "",
|
||||
"print-media-type": "",
|
||||
"page-size": "A4",
|
||||
"orientation": "Portrait"
|
||||
}
|
||||
config = pdfkit.configuration(wkhtmltopdf=WKHTMLTOPDF_PATH)
|
||||
|
||||
# 生成PDF
|
||||
pdfkit.from_file(
|
||||
temp_html,
|
||||
pdf_path,
|
||||
configuration=config,
|
||||
options=options
|
||||
)
|
||||
print(f"PDF已保存到: {pdf_path}")
|
||||
|
||||
finally:
|
||||
# 清理临时文件
|
||||
if os.path.exists(temp_html):
|
||||
os.remove(temp_html)
|
||||
if os.path.exists(temp_img_dir):
|
||||
for f in os.listdir(temp_img_dir):
|
||||
os.remove(os.path.join(temp_img_dir, f))
|
||||
os.rmdir(temp_img_dir)
|
||||
|
||||
except Exception as e:
|
||||
print(f"保存结果时出错: {str(e)}")
|
||||
|
||||
def process(self):
|
||||
"""处理视频并生成摘要"""
|
||||
try:
|
||||
start_time = time.time()
|
||||
print("开始处理视频...")
|
||||
|
||||
# 提取视频帧
|
||||
self.extract_frames()
|
||||
|
||||
# 创建线程池
|
||||
with ThreadPoolExecutor(max_workers=2) as executor:
|
||||
# 提交音频处理和帧处理任务
|
||||
audio_future = executor.submit(self.process_audio)
|
||||
frames_future = executor.submit(self.process_frames)
|
||||
|
||||
# 等待两个任务完成
|
||||
audio_future.result()
|
||||
frames_future.result()
|
||||
|
||||
print("生成摘要...")
|
||||
self.save_results()
|
||||
|
||||
end_time = time.time()
|
||||
print(f"处理完成!总耗时:{end_time - start_time:.2f}秒")
|
||||
|
||||
except Exception as e:
|
||||
print(f"处理过程出错: {str(e)}")
|
||||
raise # 重新抛出异常以便调试
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# 使用示例
|
||||
video_path = "D:/python项目文件/1/input3.mp4" # 替换为实际的视频路径
|
||||
output_dir = "custom_output" # 自定义输出路径
|
||||
summarizer = PPTSummarizer(video_path, output_dir)
|
||||
summarizer.process()
|
Loading…
Reference in New Issue
Block a user