Compare commits
No commits in common. "main" and "2" have entirely different histories.
BIN
3.0/input3.mp4
BIN
3.0/input3.mp4
Binary file not shown.
BIN
3.0/summary.pdf
BIN
3.0/summary.pdf
Binary file not shown.
576
3.0/毕设.py
576
3.0/毕设.py
@ -1,576 +0,0 @@
|
||||
import cv2
|
||||
import numpy as np
|
||||
from skimage.metrics import structural_similarity as ssim
|
||||
from moviepy.editor import VideoFileClip
|
||||
from PIL import Image
|
||||
import os
|
||||
from scipy.signal import find_peaks
|
||||
import torch
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
import time
|
||||
import whisper
|
||||
from collections import defaultdict
|
||||
import re
|
||||
from reportlab.lib.pagesizes import A4
|
||||
from reportlab.pdfgen import canvas
|
||||
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
|
||||
from reportlab.platypus import Paragraph, Image as RLImage
|
||||
from reportlab.lib.units import inch
|
||||
import threading
|
||||
import pdfkit
|
||||
from jinja2 import Environment
|
||||
import io
|
||||
import base64
|
||||
|
||||
# 全局配置
|
||||
SSIM_THRESHOLD = 0.85 # 关键帧去重阈值
|
||||
FRAME_INTERVAL = 2 # 抽帧间隔(秒)
|
||||
TRANSITION_WORDS = ["接下来", "下一页", "如图"] # 过渡词过滤列表
|
||||
WKHTMLTOPDF_PATH = r"D:\wkhtmltopdf\bin\wkhtmltopdf.exe"
|
||||
|
||||
|
||||
class PPTSummarizer:
|
||||
def __init__(self, video_path, output_dir=None):
|
||||
self.video_path = video_path
|
||||
# 如果没有指定输出目录,则使用默认目录
|
||||
if output_dir is None:
|
||||
# 使用视频文件名作为输出目录名
|
||||
video_name = os.path.splitext(os.path.basename(video_path))[0]
|
||||
self.output_dir = os.path.join("output", video_name)
|
||||
else:
|
||||
self.output_dir = output_dir
|
||||
|
||||
self.frames = []
|
||||
self.key_frames = []
|
||||
self.text_content = []
|
||||
self.frame_timestamps = []
|
||||
self.aligned_data = []
|
||||
self.processing_complete = threading.Event()
|
||||
|
||||
# 创建输出目录
|
||||
if not os.path.exists(self.output_dir):
|
||||
os.makedirs(self.output_dir)
|
||||
|
||||
# 初始化whisper模型
|
||||
self.whisper_model = whisper.load_model("tiny", device="cpu")
|
||||
|
||||
def extract_frames(self):
|
||||
"""提取视频帧"""
|
||||
try:
|
||||
# 使用VideoFileClip替代cv2
|
||||
video = VideoFileClip(self.video_path)
|
||||
duration = video.duration
|
||||
fps = video.fps
|
||||
|
||||
# 计算采样间隔
|
||||
sample_interval = max(1 / fps, FRAME_INTERVAL)
|
||||
timestamps = np.arange(0, duration, sample_interval)
|
||||
|
||||
print(f"开始提取帧,视频时长:{duration:.2f}秒,FPS:{fps}")
|
||||
|
||||
# 提取帧
|
||||
for t in timestamps:
|
||||
try:
|
||||
frame = video.get_frame(t)
|
||||
# 转换为BGR格式(OpenCV格式)
|
||||
frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
|
||||
self.frames.append(frame_bgr)
|
||||
self.frame_timestamps.append(t)
|
||||
except Exception as e:
|
||||
print(f"提取帧 {t}s 时出错: {str(e)}")
|
||||
continue
|
||||
|
||||
video.close()
|
||||
print(f"成功提取 {len(self.frames)} 帧")
|
||||
return fps
|
||||
|
||||
except Exception as e:
|
||||
print(f"视频处理出错: {str(e)}")
|
||||
return 0
|
||||
|
||||
def process_audio(self):
|
||||
"""处理音频"""
|
||||
try:
|
||||
print("开始语音识别...")
|
||||
# 使用更大的模型以提高识别准确度
|
||||
result = self.whisper_model.transcribe(
|
||||
self.video_path,
|
||||
fp16=False,
|
||||
language="zh",
|
||||
task="transcribe",
|
||||
verbose=True
|
||||
)
|
||||
segments = result.get("segments", [])
|
||||
print(f"语音识别完成,共识别出 {len(segments)} 个片段")
|
||||
|
||||
# 打印识别结果
|
||||
for i, seg in enumerate(segments):
|
||||
try:
|
||||
print(f"片段 {i + 1}: {seg['start']:.1f}s - {seg['end']:.1f}s: {seg['text']}")
|
||||
except UnicodeEncodeError:
|
||||
print(f"片段 {i + 1}: {seg['start']:.1f}s - {seg['end']:.1f}s: [文本包含特殊字符]")
|
||||
|
||||
# 生成页面时间段
|
||||
intervals = []
|
||||
for i in range(len(self.frame_timestamps)):
|
||||
start = self.frame_timestamps[i]
|
||||
end = self.frame_timestamps[i + 1] if i < len(self.frame_timestamps) - 1 else self.frame_timestamps[
|
||||
-1] + 1
|
||||
intervals.append((start, end))
|
||||
print(f"页面 {i + 1} 时间段: {start:.1f}s - {end:.1f}s")
|
||||
|
||||
# 改进的对齐逻辑
|
||||
page_texts = defaultdict(list)
|
||||
page_segments = defaultdict(list) # 存储每个页面对应的片段
|
||||
all_texts = [] # 存储所有文本片段
|
||||
used_texts = set() # 跟踪已使用的文本
|
||||
|
||||
# 第一步:收集所有文本片段
|
||||
for seg in segments:
|
||||
try:
|
||||
seg_start = seg["start"]
|
||||
seg_end = seg["end"]
|
||||
seg_text = seg["text"].strip()
|
||||
all_texts.append((seg_start, seg_end, seg_text))
|
||||
except Exception as e:
|
||||
print(f"处理片段时出错: {str(e)}")
|
||||
continue
|
||||
|
||||
# 第二步:将文本分配到各个页面
|
||||
for start, end, text in all_texts:
|
||||
try:
|
||||
# 找到与当前片段时间重叠的所有页面
|
||||
overlapping_pages = []
|
||||
for page_idx, (page_start, page_end) in enumerate(intervals):
|
||||
if (start <= page_end and end >= page_start):
|
||||
overlapping_pages.append((page_idx, page_start, page_end))
|
||||
|
||||
# 如果找到重叠页面,将文本添加到最合适的页面
|
||||
if overlapping_pages:
|
||||
# 计算每个页面的重叠时间
|
||||
page_overlaps = []
|
||||
for page_idx, page_start, page_end in overlapping_pages:
|
||||
overlap_start = max(start, page_start)
|
||||
overlap_end = min(end, page_end)
|
||||
overlap_duration = overlap_end - overlap_start
|
||||
page_overlaps.append((page_idx, overlap_duration))
|
||||
|
||||
# 按重叠时间排序
|
||||
page_overlaps.sort(key=lambda x: x[1], reverse=True)
|
||||
|
||||
# 将文本添加到重叠时间最长的页面
|
||||
best_page = page_overlaps[0][0]
|
||||
if text not in used_texts: # 确保文本未被使用
|
||||
page_texts[best_page].append(text)
|
||||
page_segments[best_page].append((start, end, text))
|
||||
used_texts.add(text)
|
||||
print(f"将文本 '{text}' 添加到页面 {best_page + 1}")
|
||||
except Exception as e:
|
||||
print(f"分配文本时出错: {str(e)}")
|
||||
continue
|
||||
|
||||
# 第三步:优化每个页面的文本
|
||||
self.aligned_data = []
|
||||
for idx in range(len(intervals)):
|
||||
try:
|
||||
# 获取当前页面的所有片段
|
||||
segments = page_segments[idx]
|
||||
|
||||
# 按时间排序
|
||||
segments.sort(key=lambda x: x[0])
|
||||
|
||||
# 合并相邻的相似文本
|
||||
merged_texts = []
|
||||
current_text = ""
|
||||
last_end_time = 0
|
||||
|
||||
for start, end, text in segments:
|
||||
# 如果当前文本为空,直接添加
|
||||
if not current_text:
|
||||
current_text = text
|
||||
last_end_time = end
|
||||
continue
|
||||
|
||||
# 计算时间间隔
|
||||
time_gap = start - last_end_time
|
||||
|
||||
# 如果时间间隔小于3秒,合并文本
|
||||
if time_gap < 3.0: # 增加时间间隔阈值
|
||||
current_text += " " + text
|
||||
else:
|
||||
merged_texts.append(current_text)
|
||||
current_text = text
|
||||
|
||||
last_end_time = end
|
||||
|
||||
# 添加最后一个文本
|
||||
if current_text:
|
||||
merged_texts.append(current_text)
|
||||
|
||||
# 合并所有文本
|
||||
final_text = " ".join(merged_texts)
|
||||
|
||||
# 如果当前页面文本为空,尝试从前一页面获取
|
||||
if not final_text and idx > 0:
|
||||
final_text = self.aligned_data[idx - 1]["text"]
|
||||
|
||||
# 优化文本
|
||||
optimized_text = self.optimize_text(final_text)
|
||||
|
||||
if optimized_text:
|
||||
print(f"页面 {idx + 1} 的优化后文本内容: {optimized_text}")
|
||||
|
||||
self.aligned_data.append({
|
||||
"page": idx,
|
||||
"start_time": intervals[idx][0],
|
||||
"end_time": intervals[idx][1],
|
||||
"text": optimized_text
|
||||
})
|
||||
except Exception as e:
|
||||
print(f"处理页面 {idx + 1} 时出错: {str(e)}")
|
||||
# 添加空数据
|
||||
self.aligned_data.append({
|
||||
"page": idx,
|
||||
"start_time": intervals[idx][0],
|
||||
"end_time": intervals[idx][1],
|
||||
"text": ""
|
||||
})
|
||||
|
||||
# 第四步:确保所有文本都被包含
|
||||
# 检查是否有遗漏的文本片段
|
||||
for start, end, text in all_texts:
|
||||
try:
|
||||
if text not in used_texts:
|
||||
# 找到最后一个非空页面
|
||||
last_non_empty_page = -1
|
||||
for i in range(len(self.aligned_data) - 1, -1, -1):
|
||||
if self.aligned_data[i]["text"]:
|
||||
last_non_empty_page = i
|
||||
break
|
||||
|
||||
if last_non_empty_page >= 0:
|
||||
self.aligned_data[last_non_empty_page]["text"] += " " + text
|
||||
print(f"将遗漏的文本 '{text}' 添加到页面 {last_non_empty_page + 1}")
|
||||
except Exception as e:
|
||||
print(f"处理遗漏文本时出错: {str(e)}")
|
||||
continue
|
||||
|
||||
# 保存对齐数据到文件
|
||||
try:
|
||||
with open(os.path.join(self.output_dir, 'aligned_data.txt'), 'w', encoding='utf-8') as f:
|
||||
for data in self.aligned_data:
|
||||
f.write(f"页面 {data['page'] + 1}:\n")
|
||||
f.write(f"时间: {data['start_time']:.1f}s - {data['end_time']:.1f}s\n")
|
||||
f.write(f"文本: {data['text']}\n\n")
|
||||
print("对齐数据已保存到文件")
|
||||
except Exception as e:
|
||||
print(f"保存对齐数据时出错: {str(e)}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"音频处理出错: {str(e)}")
|
||||
self.aligned_data = []
|
||||
|
||||
def process_frames(self):
|
||||
"""处理视频帧"""
|
||||
try:
|
||||
print("开始处理视频帧...")
|
||||
# 计算帧间相似度
|
||||
similarities = []
|
||||
for i in range(len(self.frames) - 1):
|
||||
try:
|
||||
frame1 = cv2.cvtColor(self.frames[i], cv2.COLOR_BGR2GRAY)
|
||||
frame2 = cv2.cvtColor(self.frames[i + 1], cv2.COLOR_BGR2GRAY)
|
||||
similarity = ssim(frame1, frame2)
|
||||
similarities.append(similarity)
|
||||
except Exception as e:
|
||||
print(f"计算帧 {i} 相似度时出错: {str(e)}")
|
||||
similarities.append(1.0) # 出错时假设帧相似
|
||||
|
||||
# 使用自适应阈值
|
||||
mean_similarity = np.mean(similarities)
|
||||
std_similarity = np.std(similarities)
|
||||
adaptive_threshold = mean_similarity - 2 * std_similarity
|
||||
|
||||
# 使用峰值检测找到关键帧
|
||||
peaks, _ = find_peaks([1 - s for s in similarities],
|
||||
height=1 - adaptive_threshold,
|
||||
distance=int(len(similarities) / 20)) # 最小距离
|
||||
|
||||
# 保存关键帧
|
||||
for peak in peaks:
|
||||
if not self.is_blank_frame(self.frames[peak]):
|
||||
self.key_frames.append(self.frames[peak])
|
||||
|
||||
print(f"找到 {len(self.key_frames)} 个关键帧")
|
||||
|
||||
except Exception as e:
|
||||
print(f"处理视频帧时出错: {str(e)}")
|
||||
|
||||
def is_blank_frame(self, frame, threshold=30):
|
||||
"""检测是否为空白帧"""
|
||||
try:
|
||||
# 转换为灰度图
|
||||
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
||||
|
||||
# 计算图像统计特征
|
||||
mean = np.mean(gray)
|
||||
std_dev = np.std(gray)
|
||||
|
||||
# 检查是否为纯黑或纯白
|
||||
is_black = mean < 10 and std_dev < 5
|
||||
is_white = mean > 245 and std_dev < 5
|
||||
|
||||
# 检查是否有足够的细节
|
||||
has_detail = std_dev > threshold
|
||||
|
||||
return is_black or is_white or not has_detail
|
||||
except Exception as e:
|
||||
print(f"检查空白帧时出错: {str(e)}")
|
||||
return True
|
||||
|
||||
def optimize_text(self, text):
|
||||
"""文本优化"""
|
||||
try:
|
||||
if not text:
|
||||
return ""
|
||||
|
||||
# 过滤过渡词
|
||||
sentences = re.split(r'[。!?]', text)
|
||||
filtered = []
|
||||
seen = set()
|
||||
for sent in sentences:
|
||||
sent = sent.strip()
|
||||
if (len(sent) >= 10
|
||||
and not any(word in sent for word in TRANSITION_WORDS)
|
||||
and sent not in seen):
|
||||
filtered.append(sent)
|
||||
seen.add(sent)
|
||||
|
||||
result = '。'.join(filtered) + '。' if filtered else ""
|
||||
if result:
|
||||
print(f"优化后的文本: {result}")
|
||||
return result
|
||||
except Exception as e:
|
||||
print(f"文本优化时出错: {str(e)}")
|
||||
return text
|
||||
|
||||
def save_results(self):
|
||||
"""保存结果"""
|
||||
try:
|
||||
# 检查输出目录权限
|
||||
if not os.access(self.output_dir, os.W_OK):
|
||||
print(f"错误:没有写入权限: {self.output_dir}")
|
||||
return
|
||||
|
||||
# 生成PDF文档
|
||||
pdf_path = os.path.join(self.output_dir, 'summary.pdf')
|
||||
|
||||
# 创建临时HTML文件
|
||||
temp_html = os.path.join(self.output_dir, "_temp_pdf.html")
|
||||
temp_img_dir = os.path.join(self.output_dir, "_temp_pdf_images")
|
||||
os.makedirs(temp_img_dir, exist_ok=True)
|
||||
|
||||
try:
|
||||
# 使用绝对路径
|
||||
abs_temp_img_dir = os.path.abspath(temp_img_dir)
|
||||
|
||||
html_content = """
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<style>
|
||||
@page {
|
||||
margin: 20px;
|
||||
size: A4;
|
||||
}
|
||||
body {
|
||||
font-family: "Microsoft YaHei", "SimSun", sans-serif;
|
||||
line-height: 1.6;
|
||||
margin: 0;
|
||||
padding: 20px;
|
||||
}
|
||||
.page {
|
||||
page-break-inside: avoid;
|
||||
margin-bottom: 30px;
|
||||
padding: 20px;
|
||||
background-color: white;
|
||||
}
|
||||
img {
|
||||
max-width: 100%;
|
||||
height: auto;
|
||||
display: block;
|
||||
margin: 10px auto;
|
||||
}
|
||||
.timestamp {
|
||||
color: #666;
|
||||
font-size: 12pt;
|
||||
margin: 10px 0;
|
||||
}
|
||||
.content {
|
||||
font-size: 14pt;
|
||||
line-height: 1.6;
|
||||
margin: 15px 0;
|
||||
}
|
||||
h1 {
|
||||
text-align: center;
|
||||
color: #333;
|
||||
margin-bottom: 30px;
|
||||
}
|
||||
h2 {
|
||||
color: #444;
|
||||
margin: 15px 0;
|
||||
}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<h1>PPT视频结构化摘要</h1>
|
||||
{% for page in pages %}
|
||||
<div class="page">
|
||||
<h2>页面 {{ page.num }}</h2>
|
||||
<div class="timestamp">{{ page.time }}</div>
|
||||
<img src="{{ page.image_path }}" alt="页面截图">
|
||||
<div class="content">{{ page.text }}</div>
|
||||
</div>
|
||||
{% endfor %}
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
|
||||
pages_data = []
|
||||
for idx, frame in enumerate(self.key_frames):
|
||||
try:
|
||||
img_filename = f"page_{idx}.jpg"
|
||||
img_path = os.path.join(abs_temp_img_dir, img_filename)
|
||||
|
||||
# 将numpy数组转换为PIL Image对象
|
||||
if len(frame.shape) == 3 and frame.shape[2] == 3:
|
||||
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
||||
else:
|
||||
frame_rgb = frame
|
||||
|
||||
# 创建PIL Image对象
|
||||
img = Image.fromarray(frame_rgb)
|
||||
|
||||
# 调整图片大小
|
||||
max_width = 800
|
||||
if img.width > max_width:
|
||||
ratio = max_width / img.width
|
||||
new_height = int(img.height * ratio)
|
||||
img = img.resize((max_width, new_height), Image.Resampling.LANCZOS)
|
||||
|
||||
# 保存图片
|
||||
img.save(img_path, format='JPEG', quality=85, optimize=True)
|
||||
|
||||
# 获取从开始到当前帧的所有文本
|
||||
current_time = self.aligned_data[idx]['end_time']
|
||||
|
||||
# 收集从开始到当前时间点的所有文本
|
||||
texts = []
|
||||
for data in self.aligned_data:
|
||||
if data['end_time'] <= current_time:
|
||||
if data['text']:
|
||||
texts.append(data['text'])
|
||||
|
||||
# 合并文本
|
||||
combined_text = " ".join(texts)
|
||||
|
||||
# 如果只有一帧关键帧,显示整个视频的所有文本
|
||||
if len(self.key_frames) == 1:
|
||||
all_texts = []
|
||||
for data in self.aligned_data:
|
||||
if data['text']:
|
||||
all_texts.append(data['text'])
|
||||
combined_text = " ".join(all_texts)
|
||||
|
||||
# 添加file://前缀到图片路径
|
||||
img_path_with_prefix = f"file:///{img_path.replace(os.sep, '/')}"
|
||||
|
||||
pages_data.append({
|
||||
"num": idx + 1,
|
||||
"time": f"{self.aligned_data[idx]['start_time']:.1f}s - {self.aligned_data[idx]['end_time']:.1f}s",
|
||||
"image_path": img_path_with_prefix,
|
||||
"text": combined_text
|
||||
})
|
||||
except Exception as e:
|
||||
print(f"处理页面 {idx + 1} 时出错: {str(e)}")
|
||||
continue
|
||||
|
||||
# 生成HTML文件
|
||||
env = Environment()
|
||||
template = env.from_string(html_content)
|
||||
with open(temp_html, "w", encoding="utf-8") as f:
|
||||
f.write(template.render(pages=pages_data))
|
||||
|
||||
# 设置PDF生成选项
|
||||
options = {
|
||||
"enable-local-file-access": "",
|
||||
"encoding": "UTF-8",
|
||||
"margin-top": "15mm",
|
||||
"margin-bottom": "15mm",
|
||||
"margin-left": "15mm",
|
||||
"margin-right": "15mm",
|
||||
"quiet": "",
|
||||
"print-media-type": "",
|
||||
"page-size": "A4",
|
||||
"orientation": "Portrait"
|
||||
}
|
||||
config = pdfkit.configuration(wkhtmltopdf=WKHTMLTOPDF_PATH)
|
||||
|
||||
# 生成PDF
|
||||
pdfkit.from_file(
|
||||
temp_html,
|
||||
pdf_path,
|
||||
configuration=config,
|
||||
options=options
|
||||
)
|
||||
print(f"PDF已保存到: {pdf_path}")
|
||||
|
||||
finally:
|
||||
# 清理临时文件
|
||||
if os.path.exists(temp_html):
|
||||
os.remove(temp_html)
|
||||
if os.path.exists(temp_img_dir):
|
||||
for f in os.listdir(temp_img_dir):
|
||||
os.remove(os.path.join(temp_img_dir, f))
|
||||
os.rmdir(temp_img_dir)
|
||||
|
||||
except Exception as e:
|
||||
print(f"保存结果时出错: {str(e)}")
|
||||
|
||||
def process(self):
|
||||
"""处理视频并生成摘要"""
|
||||
try:
|
||||
start_time = time.time()
|
||||
print("开始处理视频...")
|
||||
|
||||
# 提取视频帧
|
||||
self.extract_frames()
|
||||
|
||||
# 创建线程池
|
||||
with ThreadPoolExecutor(max_workers=2) as executor:
|
||||
# 提交音频处理和帧处理任务
|
||||
audio_future = executor.submit(self.process_audio)
|
||||
frames_future = executor.submit(self.process_frames)
|
||||
|
||||
# 等待两个任务完成
|
||||
audio_future.result()
|
||||
frames_future.result()
|
||||
|
||||
print("生成摘要...")
|
||||
self.save_results()
|
||||
|
||||
end_time = time.time()
|
||||
print(f"处理完成!总耗时:{end_time - start_time:.2f}秒")
|
||||
|
||||
except Exception as e:
|
||||
print(f"处理过程出错: {str(e)}")
|
||||
raise # 重新抛出异常以便调试
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# 使用示例
|
||||
video_path = "D:/python项目文件/1/input3.mp4" # 替换为实际的视频路径
|
||||
output_dir = "custom_output" # 自定义输出路径
|
||||
summarizer = PPTSummarizer(video_path, output_dir)
|
||||
summarizer.process()
|
BIN
4,0/summary.pdf
BIN
4,0/summary.pdf
Binary file not shown.
525
4,0/毕设.py
525
4,0/毕设.py
@ -1,525 +0,0 @@
|
||||
import os
|
||||
import re
|
||||
import base64
|
||||
import warnings
|
||||
import imageio
|
||||
import whisper
|
||||
import numpy as np
|
||||
import pdfkit
|
||||
from PIL import Image
|
||||
from skimage.metrics import structural_similarity as ssim
|
||||
from collections import defaultdict
|
||||
import subprocess
|
||||
from jinja2 import Environment
|
||||
import cv2
|
||||
from scipy.signal import find_peaks
|
||||
from skimage.feature import hog
|
||||
from skimage.color import rgb2gray
|
||||
|
||||
# ======================== 全局配置 ========================
|
||||
warnings.filterwarnings("ignore", message="FP16 is not supported on CPU; using FP32 instead")
|
||||
VIDEO_PATH = "D:/python项目文件/1/input.mp4" # 输入视频路径
|
||||
MODEL_DIR = "D:/whisper_models" # Whisper模型目录
|
||||
FFMPEG_BIN = r"D:\Program Files\ffmpeg\bin" # FFmpeg安装路径
|
||||
WKHTMLTOPDF_PATH = r"D:\wkhtmltopdf\bin\wkhtmltopdf.exe" # wkhtmltopdf路径
|
||||
SSIM_THRESHOLD = 0.85 # 关键帧去重阈值
|
||||
FRAME_INTERVAL = 2 # 抽帧间隔(秒)
|
||||
OUTPUT_DIR = "D:\桌面文件\python\output" # 输出目录
|
||||
TRANSITION_WORDS = ["接下来", "下一页", "如图"] # 过渡词过滤列
|
||||
HOG_THRESHOLD = 0.7 # HOG特征相似度阈值
|
||||
COLOR_THRESHOLD = 0.8 # 颜色直方图相似度阈值
|
||||
WHISPER_MODEL = "base" # Whisper模型大小
|
||||
PROFESSIONAL_TERMS = {
|
||||
"人工智能": "AI",
|
||||
"机器学习": "ML",
|
||||
"深度学习": "DL",
|
||||
"神经网络": "NN",
|
||||
"卷积神经网络": "CNN",
|
||||
"循环神经网络": "RNN",
|
||||
"自然语言处理": "NLP",
|
||||
"计算机视觉": "CV",
|
||||
"大数据": "Big Data",
|
||||
"云计算": "Cloud Computing"
|
||||
} # 专业术语词典
|
||||
|
||||
|
||||
# ========================================================
|
||||
|
||||
# ---------------------- 核心功能模块 ----------------------
|
||||
class VideoProcessor:
|
||||
def __init__(self):
|
||||
os.environ["PATH"] = FFMPEG_BIN + os.pathsep + os.environ["PATH"]
|
||||
|
||||
@staticmethod
|
||||
def check_ffmpeg():
|
||||
"""验证FFmpeg可用性"""
|
||||
try:
|
||||
subprocess.run(["ffmpeg", "-version"], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
print("[系统] FFmpeg验证成功")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"[错误] FFmpeg验证失败: {str(e)}")
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def calculate_color_histogram(frame):
|
||||
"""计算颜色直方图特征"""
|
||||
hist = cv2.calcHist([frame], [0, 1, 2], None, [8, 8, 8], [0, 256, 0, 256, 0, 256])
|
||||
cv2.normalize(hist, hist)
|
||||
return hist.flatten()
|
||||
|
||||
@staticmethod
|
||||
def calculate_hog_features(frame):
|
||||
"""计算HOG特征"""
|
||||
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
||||
features = hog(gray, orientations=8, pixels_per_cell=(16, 16),
|
||||
cells_per_block=(1, 1), visualize=False)
|
||||
return features
|
||||
|
||||
@staticmethod
|
||||
def is_ppt_transition(frame1, frame2):
|
||||
"""检测PPT页面切换"""
|
||||
# 转换为灰度图
|
||||
gray1 = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY)
|
||||
gray2 = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY)
|
||||
|
||||
# 计算边缘
|
||||
edges1 = cv2.Canny(gray1, 100, 200)
|
||||
edges2 = cv2.Canny(gray2, 100, 200)
|
||||
|
||||
# 计算边缘差异
|
||||
diff = cv2.absdiff(edges1, edges2)
|
||||
return np.mean(diff) > 50 # 阈值可调整
|
||||
|
||||
@staticmethod
|
||||
def extract_keyframes(video_path: str) -> tuple:
|
||||
"""提取去重关键帧及其时间戳(多特征融合)"""
|
||||
try:
|
||||
reader = imageio.get_reader(video_path)
|
||||
fps = reader.get_meta_data()["fps"]
|
||||
keyframes = []
|
||||
timestamps = []
|
||||
prev_frame = None
|
||||
prev_features = None
|
||||
|
||||
for idx, frame in enumerate(reader):
|
||||
curr_time = idx / fps
|
||||
if curr_time - (timestamps[-1] if timestamps else 0) < FRAME_INTERVAL:
|
||||
continue
|
||||
|
||||
# 多特征相似度计算
|
||||
if prev_frame is not None:
|
||||
# 1. SSIM相似度
|
||||
gray_prev = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
|
||||
gray_curr = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
||||
ssim_score = ssim(gray_prev, gray_curr)
|
||||
|
||||
# 2. 颜色直方图相似度
|
||||
hist_prev = VideoProcessor.calculate_color_histogram(prev_frame)
|
||||
hist_curr = VideoProcessor.calculate_color_histogram(frame)
|
||||
color_sim = cv2.compareHist(hist_prev, hist_curr, cv2.HISTCMP_CORREL)
|
||||
|
||||
# 3. HOG特征相似度
|
||||
hog_prev = VideoProcessor.calculate_hog_features(prev_frame)
|
||||
hog_curr = VideoProcessor.calculate_hog_features(frame)
|
||||
hog_sim = np.dot(hog_prev, hog_curr) / (np.linalg.norm(hog_prev) * np.linalg.norm(hog_curr))
|
||||
|
||||
# 4. PPT页面切换检测
|
||||
is_transition = VideoProcessor.is_ppt_transition(prev_frame, frame)
|
||||
|
||||
# 综合判断
|
||||
if (ssim_score > SSIM_THRESHOLD and
|
||||
color_sim > COLOR_THRESHOLD and
|
||||
hog_sim > HOG_THRESHOLD and
|
||||
not is_transition):
|
||||
continue
|
||||
|
||||
keyframes.append(Image.fromarray(frame))
|
||||
timestamps.append(curr_time)
|
||||
prev_frame = frame
|
||||
|
||||
reader.close()
|
||||
print(f"[图像] 关键帧提取完成,共{len(keyframes)}帧")
|
||||
return keyframes, timestamps
|
||||
except Exception as e:
|
||||
print(f"[错误] 关键帧提取失败: {str(e)}")
|
||||
return [], []
|
||||
|
||||
@staticmethod
|
||||
def transcribe_audio(video_path: str, model_name: str = WHISPER_MODEL) -> list:
|
||||
"""语音识别与时间戳获取(支持中英文混合)"""
|
||||
try:
|
||||
# 使用更大的模型提高准确率
|
||||
model = whisper.load_model(model_name, device="cpu", download_root=MODEL_DIR)
|
||||
|
||||
# 配置转写参数
|
||||
result = model.transcribe(
|
||||
video_path,
|
||||
fp16=False,
|
||||
language="zh",
|
||||
task="transcribe",
|
||||
verbose=True,
|
||||
initial_prompt="这是一段包含中英文的PPT讲解视频,可能包含专业术语。"
|
||||
)
|
||||
|
||||
segments = result.get("segments", [])
|
||||
|
||||
# 后处理:专业术语替换
|
||||
for seg in segments:
|
||||
text = seg["text"]
|
||||
for cn, en in PROFESSIONAL_TERMS.items():
|
||||
text = text.replace(cn, f"{cn}({en})")
|
||||
seg["text"] = text
|
||||
|
||||
return segments
|
||||
except Exception as e:
|
||||
print(f"[错误] 语音识别失败: {str(e)}")
|
||||
return []
|
||||
|
||||
|
||||
# ---------------------- 业务逻辑模块 ----------------------
|
||||
class ContentAligner:
|
||||
@staticmethod
|
||||
def generate_page_intervals(timestamps: list, duration: float) -> list:
|
||||
"""生成页面时间段"""
|
||||
intervals = []
|
||||
for i in range(len(timestamps)):
|
||||
start = timestamps[i]
|
||||
end = timestamps[i + 1] if i < len(timestamps) - 1 else duration
|
||||
intervals.append((start, end))
|
||||
return intervals
|
||||
|
||||
@staticmethod
|
||||
def calculate_text_similarity(text1: str, text2: str) -> float:
|
||||
"""计算文本相似度"""
|
||||
# 使用简单的词重叠度计算
|
||||
words1 = set(re.findall(r'\w+', text1.lower()))
|
||||
words2 = set(re.findall(r'\w+', text2.lower()))
|
||||
if not words1 or not words2:
|
||||
return 0.0
|
||||
intersection = words1.intersection(words2)
|
||||
union = words1.union(words2)
|
||||
return len(intersection) / len(union)
|
||||
|
||||
@staticmethod
|
||||
def find_best_match(segments: list, intervals: list) -> dict:
|
||||
"""为每个语音片段找到最佳匹配的页面"""
|
||||
page_texts = defaultdict(list)
|
||||
unmatched_segments = []
|
||||
|
||||
for seg in segments:
|
||||
seg_start = seg["start"]
|
||||
best_match = None
|
||||
best_score = 0.0
|
||||
|
||||
# 1. 首先尝试时间戳匹配
|
||||
for page_idx, (start, end) in enumerate(intervals):
|
||||
if start <= seg_start < end:
|
||||
best_match = page_idx
|
||||
break
|
||||
|
||||
# 2. 如果时间戳匹配失败,尝试文本相似度匹配
|
||||
if best_match is None:
|
||||
for page_idx, (start, end) in enumerate(intervals):
|
||||
# 获取该页面的所有文本
|
||||
page_text = " ".join([s["text"] for s in segments if start <= s["start"] < end])
|
||||
similarity = ContentAligner.calculate_text_similarity(seg["text"], page_text)
|
||||
if similarity > best_score:
|
||||
best_score = similarity
|
||||
best_match = page_idx
|
||||
|
||||
# 3. 如果找到匹配,添加到对应页面
|
||||
if best_match is not None:
|
||||
page_texts[best_match].append(seg)
|
||||
else:
|
||||
unmatched_segments.append(seg)
|
||||
|
||||
# 4. 处理未匹配的片段
|
||||
if unmatched_segments:
|
||||
print(f"[警告] 发现{len(unmatched_segments)}个未匹配的语音片段")
|
||||
# 将未匹配片段添加到最近的页面
|
||||
for seg in unmatched_segments:
|
||||
closest_page = min(range(len(intervals)),
|
||||
key=lambda i: abs(seg["start"] - (intervals[i][0] + intervals[i][1]) / 2))
|
||||
page_texts[closest_page].append(seg)
|
||||
|
||||
return page_texts
|
||||
|
||||
@staticmethod
|
||||
def align_content(video_path: str, timestamps: list) -> list:
|
||||
"""语音-画面对齐主逻辑(改进版)"""
|
||||
try:
|
||||
reader = imageio.get_reader(video_path)
|
||||
duration = reader.get_meta_data()["duration"]
|
||||
reader.close()
|
||||
except:
|
||||
duration = timestamps[-1] + FRAME_INTERVAL
|
||||
|
||||
segments = VideoProcessor.transcribe_audio(video_path)
|
||||
intervals = ContentAligner.generate_page_intervals(timestamps, duration)
|
||||
|
||||
# 使用改进的匹配算法
|
||||
page_texts = ContentAligner.find_best_match(segments, intervals)
|
||||
|
||||
# 生成最终的对齐数据
|
||||
aligned_data = []
|
||||
for idx in range(len(intervals)):
|
||||
text = " ".join([seg["text"] for seg in page_texts.get(idx, [])])
|
||||
aligned_data.append({
|
||||
"page": idx,
|
||||
"start_time": intervals[idx][0],
|
||||
"end_time": intervals[idx][1],
|
||||
"text": text
|
||||
})
|
||||
|
||||
return aligned_data
|
||||
|
||||
|
||||
# ---------------------- 摘要生成模块 ----------------------
|
||||
class SummaryGenerator:
|
||||
@staticmethod
|
||||
def optimize_text(text: str) -> str:
|
||||
"""文本浓缩优化"""
|
||||
sentences = re.split(r'[。!?]', text)
|
||||
filtered = []
|
||||
seen = set()
|
||||
for sent in sentences:
|
||||
sent = sent.strip()
|
||||
if (len(sent) >= 10
|
||||
and not any(word in sent for word in TRANSITION_WORDS)
|
||||
and sent not in seen):
|
||||
filtered.append(sent)
|
||||
seen.add(sent)
|
||||
return '。'.join(filtered) + '。' if filtered else ""
|
||||
|
||||
@staticmethod
|
||||
def generate_html(aligned_data: list, keyframes: list, output_dir: str):
|
||||
"""生成HTML报告"""
|
||||
pages_data = []
|
||||
temp_img_dir = os.path.join(output_dir, "_temp_images")
|
||||
os.makedirs(temp_img_dir, exist_ok=True)
|
||||
|
||||
try:
|
||||
for idx, frame in enumerate(keyframes):
|
||||
img_path = os.path.join(temp_img_dir, f"page_{idx}.jpg")
|
||||
frame.save(img_path)
|
||||
with open(img_path, "rb") as f:
|
||||
img_data = base64.b64encode(f.read()).decode("utf-8")
|
||||
|
||||
pages_data.append({
|
||||
"num": idx + 1,
|
||||
"time": f"{aligned_data[idx]['start_time']:.1f}s - {aligned_data[idx]['end_time']:.1f}s",
|
||||
"image": f"data:image/jpeg;base64,{img_data}",
|
||||
"text": SummaryGenerator.optimize_text(aligned_data[idx]["text"])
|
||||
})
|
||||
|
||||
env = Environment()
|
||||
template = env.from_string("""
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<title>PPT视频摘要报告</title>
|
||||
<style>
|
||||
.page { margin: 20px; padding: 15px; border: 1px solid #eee; }
|
||||
img { max-width: 800px; height: auto; }
|
||||
.timestamp { color: #666; font-size: 0.9em; }
|
||||
.content { margin-top: 10px; }
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<h1>PPT视频结构化摘要</h1>
|
||||
{% for page in pages %}
|
||||
<div class="page">
|
||||
<h2>页面 {{ page.num }}</h2>
|
||||
<div class="timestamp">{{ page.time }}</div>
|
||||
<img src="{{ page.image }}" alt="页面截图">
|
||||
<div class="content">{{ page.text }}</div>
|
||||
</div>
|
||||
{% endfor %}
|
||||
</body>
|
||||
</html>
|
||||
""")
|
||||
|
||||
output_path = os.path.join(output_dir, "summary.html")
|
||||
with open(output_path, "w", encoding="utf-8") as f:
|
||||
f.write(template.render(pages=pages_data))
|
||||
print(f"[输出] HTML报告已生成: {output_path}")
|
||||
finally:
|
||||
for f in os.listdir(temp_img_dir):
|
||||
os.remove(os.path.join(temp_img_dir, f))
|
||||
os.rmdir(temp_img_dir)
|
||||
|
||||
@staticmethod
|
||||
def generate_pdf(aligned_data: list, keyframes: list, output_dir: str):
|
||||
"""生成PDF报告(优化版)"""
|
||||
temp_html = os.path.join(output_dir, "_temp_pdf.html")
|
||||
temp_img_dir = os.path.join(output_dir, "_temp_pdf_images")
|
||||
os.makedirs(temp_img_dir, exist_ok=True)
|
||||
|
||||
try:
|
||||
# 使用绝对路径
|
||||
abs_temp_img_dir = os.path.abspath(temp_img_dir)
|
||||
|
||||
html_content = """
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<style>
|
||||
@page {
|
||||
margin: 20mm;
|
||||
size: A4;
|
||||
}
|
||||
body {
|
||||
font-family: "Microsoft YaHei", "SimSun", sans-serif;
|
||||
line-height: 1.6;
|
||||
color: #333;
|
||||
}
|
||||
.page {
|
||||
page-break-inside: avoid;
|
||||
margin-bottom: 30px;
|
||||
padding: 20px;
|
||||
border: 1px solid #eee;
|
||||
border-radius: 5px;
|
||||
}
|
||||
.page-number {
|
||||
text-align: center;
|
||||
font-size: 24pt;
|
||||
font-weight: bold;
|
||||
margin-bottom: 20px;
|
||||
color: #2c3e50;
|
||||
}
|
||||
.timestamp {
|
||||
color: #666;
|
||||
font-size: 12pt;
|
||||
margin-bottom: 15px;
|
||||
}
|
||||
.image-container {
|
||||
text-align: center;
|
||||
margin: 20px 0;
|
||||
}
|
||||
img {
|
||||
max-width: 90% !important;
|
||||
height: auto;
|
||||
display: block;
|
||||
margin: 0 auto;
|
||||
box-shadow: 0 2px 5px rgba(0,0,0,0.1);
|
||||
}
|
||||
.content {
|
||||
font-size: 14pt;
|
||||
line-height: 1.8;
|
||||
margin-top: 20px;
|
||||
padding: 15px;
|
||||
background: #f9f9f9;
|
||||
border-radius: 5px;
|
||||
}
|
||||
.professional-term {
|
||||
color: #2980b9;
|
||||
font-weight: bold;
|
||||
}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<h1 style="text-align: center; color: #2c3e50; margin-bottom: 40px;">PPT视频结构化摘要</h1>
|
||||
{% for page in pages %}
|
||||
<div class="page">
|
||||
<div class="page-number">第 {{ page.num }} 页</div>
|
||||
<div class="timestamp">时间区间:{{ page.time }}</div>
|
||||
<div class="image-container">
|
||||
<img src="{{ page.image_path }}" alt="页面截图">
|
||||
</div>
|
||||
<div class="content">{{ page.text }}</div>
|
||||
</div>
|
||||
{% endfor %}
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
|
||||
pages_data = []
|
||||
for idx, frame in enumerate(keyframes):
|
||||
img_filename = f"page_{idx}.jpg"
|
||||
img_path = os.path.join(abs_temp_img_dir, img_filename)
|
||||
frame.save(img_path)
|
||||
pages_data.append({
|
||||
"num": idx + 1,
|
||||
"time": f"{aligned_data[idx]['start_time']:.1f}s - {aligned_data[idx]['end_time']:.1f}s",
|
||||
"image_path": img_path,
|
||||
"text": SummaryGenerator.optimize_text(aligned_data[idx]["text"])
|
||||
})
|
||||
|
||||
env = Environment()
|
||||
template = env.from_string(html_content)
|
||||
with open(temp_html, "w", encoding="utf-8") as f:
|
||||
f.write(template.render(pages=pages_data))
|
||||
|
||||
# PDF生成选项
|
||||
options = {
|
||||
"enable-local-file-access": "",
|
||||
"encoding": "UTF-8",
|
||||
"margin-top": "20mm",
|
||||
"margin-bottom": "20mm",
|
||||
"margin-left": "20mm",
|
||||
"margin-right": "20mm",
|
||||
"no-stop-slow-scripts": "",
|
||||
"quiet": "",
|
||||
"dpi": "300",
|
||||
"image-quality": "100",
|
||||
"enable-smart-shrinking": "",
|
||||
"print-media-type": ""
|
||||
}
|
||||
config = pdfkit.configuration(wkhtmltopdf=WKHTMLTOPDF_PATH)
|
||||
|
||||
pdf_path = os.path.join(output_dir, "summary.pdf")
|
||||
pdfkit.from_file(
|
||||
temp_html,
|
||||
pdf_path,
|
||||
configuration=config,
|
||||
options=options
|
||||
)
|
||||
print(f"[输出] PDF报告已生成: {pdf_path}")
|
||||
|
||||
finally:
|
||||
# 清理临时文件
|
||||
if os.path.exists(temp_html):
|
||||
os.remove(temp_html)
|
||||
if os.path.exists(temp_img_dir):
|
||||
for f in os.listdir(temp_img_dir):
|
||||
os.remove(os.path.join(temp_img_dir, f))
|
||||
os.rmdir(temp_img_dir)
|
||||
|
||||
@classmethod
|
||||
def generate_all(cls, aligned_data: list, keyframes: list, output_dir: str):
|
||||
"""生成所有格式报告"""
|
||||
cls.generate_html(aligned_data, keyframes, output_dir)
|
||||
cls.generate_pdf(aligned_data, keyframes, output_dir)
|
||||
|
||||
|
||||
# ---------------------- 主流程控制 ----------------------
|
||||
def main_process():
|
||||
# 环境检查
|
||||
processor = VideoProcessor()
|
||||
if not processor.check_ffmpeg():
|
||||
return
|
||||
if not os.path.exists(VIDEO_PATH):
|
||||
print(f"[错误] 视频文件不存在: {VIDEO_PATH}")
|
||||
return
|
||||
|
||||
# 关键帧提取
|
||||
keyframes, timestamps = processor.extract_keyframes(VIDEO_PATH)
|
||||
if not keyframes:
|
||||
print("[错误] 未提取到关键帧")
|
||||
return
|
||||
|
||||
# 内容对齐
|
||||
aligned_data = ContentAligner.align_content(VIDEO_PATH, timestamps)
|
||||
if not aligned_data:
|
||||
print("[警告] 未识别到有效语音内容")
|
||||
|
||||
# 生成摘要
|
||||
os.makedirs(OUTPUT_DIR, exist_ok=True)
|
||||
SummaryGenerator.generate_all(aligned_data, keyframes, OUTPUT_DIR)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main_process()
|
BIN
5.0/summary.pdf
BIN
5.0/summary.pdf
Binary file not shown.
548
5.0/毕设.py
548
5.0/毕设.py
@ -1,548 +0,0 @@
|
||||
import os
|
||||
import re
|
||||
import base64
|
||||
import warnings
|
||||
import imageio
|
||||
import whisper
|
||||
import numpy as np
|
||||
import pdfkit
|
||||
from PIL import Image
|
||||
from skimage.metrics import structural_similarity as ssim
|
||||
from collections import defaultdict
|
||||
import subprocess
|
||||
from jinja2 import Environment
|
||||
import cv2
|
||||
from scipy.signal import find_peaks
|
||||
from skimage.feature import hog
|
||||
from skimage.color import rgb2gray
|
||||
|
||||
# ======================== 全局配置 ========================
|
||||
warnings.filterwarnings("ignore", message="FP16 is not supported on CPU; using FP32 instead")
|
||||
VIDEO_PATH = "D:/python项目文件/1/input3.mp4" # 输入视频路径
|
||||
MODEL_DIR = "D:/whisper_models" # Whisper模型目录
|
||||
FFMPEG_BIN = r"D:\Program Files\ffmpeg\bin" # FFmpeg安装路径
|
||||
WKHTMLTOPDF_PATH = r"D:\wkhtmltopdf\bin\wkhtmltopdf.exe" # wkhtmltopdf路径
|
||||
SSIM_THRESHOLD = 0.85 # 关键帧去重阈值
|
||||
FRAME_INTERVAL = 2 # 抽帧间隔(秒)
|
||||
OUTPUT_DIR = "D:\桌面文件\python\output" # 输出目录
|
||||
TRANSITION_WORDS = ["接下来", "下一页", "如图"] # 过渡词过滤列
|
||||
HOG_THRESHOLD = 0.7 # HOG特征相似度阈值
|
||||
COLOR_THRESHOLD = 0.8 # 颜色直方图相似度阈值
|
||||
WHISPER_MODEL = "base" # Whisper模型大小
|
||||
PROFESSIONAL_TERMS = {
|
||||
"人工智能": "AI",
|
||||
"机器学习": "ML",
|
||||
"深度学习": "DL",
|
||||
"神经网络": "NN",
|
||||
"卷积神经网络": "CNN",
|
||||
"循环神经网络": "RNN",
|
||||
"自然语言处理": "NLP",
|
||||
"计算机视觉": "CV",
|
||||
"大数据": "Big Data",
|
||||
"云计算": "Cloud Computing"
|
||||
} # 专业术语词典
|
||||
|
||||
|
||||
# ========================================================
|
||||
|
||||
# ---------------------- 核心功能模块 ----------------------
|
||||
class VideoProcessor:
|
||||
def __init__(self):
|
||||
os.environ["PATH"] = FFMPEG_BIN + os.pathsep + os.environ["PATH"]
|
||||
|
||||
@staticmethod
|
||||
def check_ffmpeg():
|
||||
"""验证FFmpeg可用性"""
|
||||
try:
|
||||
subprocess.run(["ffmpeg", "-version"], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
print("[系统] FFmpeg验证成功")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"[错误] FFmpeg验证失败: {str(e)}")
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def calculate_color_histogram(frame):
|
||||
"""计算颜色直方图特征"""
|
||||
hist = cv2.calcHist([frame], [0, 1, 2], None, [8, 8, 8], [0, 256, 0, 256, 0, 256])
|
||||
cv2.normalize(hist, hist)
|
||||
return hist.flatten()
|
||||
|
||||
@staticmethod
|
||||
def calculate_hog_features(frame):
|
||||
"""计算HOG特征"""
|
||||
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
||||
features = hog(gray, orientations=8, pixels_per_cell=(16, 16),
|
||||
cells_per_block=(1, 1), visualize=False)
|
||||
return features
|
||||
|
||||
@staticmethod
|
||||
def is_ppt_transition(frame1, frame2):
|
||||
"""检测PPT页面切换"""
|
||||
# 转换为灰度图
|
||||
gray1 = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY)
|
||||
gray2 = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY)
|
||||
|
||||
# 计算边缘
|
||||
edges1 = cv2.Canny(gray1, 100, 200)
|
||||
edges2 = cv2.Canny(gray2, 100, 200)
|
||||
|
||||
# 计算边缘差异
|
||||
diff = cv2.absdiff(edges1, edges2)
|
||||
return np.mean(diff) > 50 # 阈值可调整
|
||||
|
||||
@staticmethod
|
||||
def extract_keyframes(video_path: str) -> tuple:
|
||||
"""提取去重关键帧及其时间戳(多特征融合)"""
|
||||
try:
|
||||
reader = imageio.get_reader(video_path)
|
||||
fps = reader.get_meta_data()["fps"]
|
||||
total_frames = reader.count_frames()
|
||||
print(f"[信息] 视频总帧数: {total_frames}")
|
||||
|
||||
keyframes = []
|
||||
timestamps = []
|
||||
prev_frame = None
|
||||
frame_count = 0
|
||||
last_progress = 0
|
||||
|
||||
for idx, frame in enumerate(reader):
|
||||
# 显示进度
|
||||
progress = int((idx / total_frames) * 100)
|
||||
if progress != last_progress and progress % 5 == 0: # 每5%显示一次进度
|
||||
print(f"[进度] 处理中: {progress}% ({idx}/{total_frames}帧)")
|
||||
last_progress = progress
|
||||
|
||||
curr_time = idx / fps
|
||||
if curr_time - (timestamps[-1] if timestamps else 0) < FRAME_INTERVAL:
|
||||
continue
|
||||
|
||||
# 多特征相似度计算
|
||||
if prev_frame is not None:
|
||||
try:
|
||||
# 1. SSIM相似度(使用简化版本)
|
||||
gray_prev = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
|
||||
gray_curr = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
||||
ssim_score = ssim(gray_prev, gray_curr, win_size=3)
|
||||
|
||||
# 2. 颜色直方图相似度
|
||||
hist_prev = VideoProcessor.calculate_color_histogram(prev_frame)
|
||||
hist_curr = VideoProcessor.calculate_color_histogram(frame)
|
||||
color_sim = cv2.compareHist(hist_prev, hist_curr, cv2.HISTCMP_CORREL)
|
||||
|
||||
# 3. HOG特征相似度(仅在SSIM和颜色相似度较高时计算)
|
||||
if ssim_score > 0.8 and color_sim > 0.8:
|
||||
hog_prev = VideoProcessor.calculate_hog_features(prev_frame)
|
||||
hog_curr = VideoProcessor.calculate_hog_features(frame)
|
||||
hog_sim = np.dot(hog_prev, hog_curr) / (np.linalg.norm(hog_prev) * np.linalg.norm(hog_curr))
|
||||
else:
|
||||
hog_sim = 0 # 如果SSIM和颜色相似度低,直接跳过HOG计算
|
||||
|
||||
# 4. PPT页面切换检测
|
||||
is_transition = VideoProcessor.is_ppt_transition(prev_frame, frame)
|
||||
|
||||
# 综合判断
|
||||
if (ssim_score > SSIM_THRESHOLD and
|
||||
color_sim > COLOR_THRESHOLD and
|
||||
hog_sim > HOG_THRESHOLD and
|
||||
not is_transition):
|
||||
continue
|
||||
except Exception as e:
|
||||
print(f"[警告] 特征计算失败: {str(e)}")
|
||||
continue
|
||||
|
||||
keyframes.append(Image.fromarray(frame))
|
||||
timestamps.append(curr_time)
|
||||
prev_frame = frame
|
||||
frame_count += 1
|
||||
|
||||
# 每处理100帧强制垃圾回收
|
||||
if frame_count % 100 == 0:
|
||||
import gc
|
||||
gc.collect()
|
||||
|
||||
reader.close()
|
||||
print(f"[图像] 关键帧提取完成,共{len(keyframes)}帧")
|
||||
return keyframes, timestamps
|
||||
except Exception as e:
|
||||
print(f"[错误] 关键帧提取失败: {str(e)}")
|
||||
return [], []
|
||||
|
||||
@staticmethod
|
||||
def transcribe_audio(video_path: str, model_name: str = WHISPER_MODEL) -> list:
|
||||
"""语音识别与时间戳获取(支持中英文混合)"""
|
||||
try:
|
||||
# 使用更大的模型提高准确率
|
||||
model = whisper.load_model(model_name, device="cpu", download_root=MODEL_DIR)
|
||||
|
||||
# 配置转写参数
|
||||
result = model.transcribe(
|
||||
video_path,
|
||||
fp16=False,
|
||||
language="zh",
|
||||
task="transcribe",
|
||||
verbose=True,
|
||||
initial_prompt="这是一段包含中英文的PPT讲解视频,可能包含专业术语。"
|
||||
)
|
||||
|
||||
segments = result.get("segments", [])
|
||||
|
||||
# 后处理:专业术语替换
|
||||
for seg in segments:
|
||||
text = seg["text"]
|
||||
for cn, en in PROFESSIONAL_TERMS.items():
|
||||
text = text.replace(cn, f"{cn}({en})")
|
||||
seg["text"] = text
|
||||
|
||||
return segments
|
||||
except Exception as e:
|
||||
print(f"[错误] 语音识别失败: {str(e)}")
|
||||
return []
|
||||
|
||||
|
||||
# ---------------------- 业务逻辑模块 ----------------------
|
||||
class ContentAligner:
|
||||
@staticmethod
|
||||
def generate_page_intervals(timestamps: list, duration: float) -> list:
|
||||
"""生成页面时间段"""
|
||||
intervals = []
|
||||
for i in range(len(timestamps)):
|
||||
start = timestamps[i]
|
||||
end = timestamps[i + 1] if i < len(timestamps) - 1 else duration
|
||||
intervals.append((start, end))
|
||||
return intervals
|
||||
|
||||
@staticmethod
|
||||
def calculate_text_similarity(text1: str, text2: str) -> float:
|
||||
"""计算文本相似度"""
|
||||
# 使用简单的词重叠度计算
|
||||
words1 = set(re.findall(r'\w+', text1.lower()))
|
||||
words2 = set(re.findall(r'\w+', text2.lower()))
|
||||
if not words1 or not words2:
|
||||
return 0.0
|
||||
intersection = words1.intersection(words2)
|
||||
union = words1.union(words2)
|
||||
return len(intersection) / len(union)
|
||||
|
||||
@staticmethod
|
||||
def find_best_match(segments: list, intervals: list) -> dict:
|
||||
"""为每个语音片段找到最佳匹配的页面"""
|
||||
page_texts = defaultdict(list)
|
||||
unmatched_segments = []
|
||||
|
||||
for seg in segments:
|
||||
seg_start = seg["start"]
|
||||
best_match = None
|
||||
best_score = 0.0
|
||||
|
||||
# 1. 首先尝试时间戳匹配
|
||||
for page_idx, (start, end) in enumerate(intervals):
|
||||
if start <= seg_start < end:
|
||||
best_match = page_idx
|
||||
break
|
||||
|
||||
# 2. 如果时间戳匹配失败,尝试文本相似度匹配
|
||||
if best_match is None:
|
||||
for page_idx, (start, end) in enumerate(intervals):
|
||||
# 获取该页面的所有文本
|
||||
page_text = " ".join([s["text"] for s in segments if start <= s["start"] < end])
|
||||
similarity = ContentAligner.calculate_text_similarity(seg["text"], page_text)
|
||||
if similarity > best_score:
|
||||
best_score = similarity
|
||||
best_match = page_idx
|
||||
|
||||
# 3. 如果找到匹配,添加到对应页面
|
||||
if best_match is not None:
|
||||
page_texts[best_match].append(seg)
|
||||
else:
|
||||
unmatched_segments.append(seg)
|
||||
|
||||
# 4. 处理未匹配的片段
|
||||
if unmatched_segments:
|
||||
print(f"[警告] 发现{len(unmatched_segments)}个未匹配的语音片段")
|
||||
# 将未匹配片段添加到最近的页面
|
||||
for seg in unmatched_segments:
|
||||
closest_page = min(range(len(intervals)),
|
||||
key=lambda i: abs(seg["start"] - (intervals[i][0] + intervals[i][1]) / 2))
|
||||
page_texts[closest_page].append(seg)
|
||||
|
||||
return page_texts
|
||||
|
||||
@staticmethod
|
||||
def align_content(video_path: str, timestamps: list) -> list:
|
||||
"""语音-画面对齐主逻辑(改进版)"""
|
||||
try:
|
||||
reader = imageio.get_reader(video_path)
|
||||
duration = reader.get_meta_data()["duration"]
|
||||
reader.close()
|
||||
except:
|
||||
duration = timestamps[-1] + FRAME_INTERVAL
|
||||
|
||||
segments = VideoProcessor.transcribe_audio(video_path)
|
||||
intervals = ContentAligner.generate_page_intervals(timestamps, duration)
|
||||
|
||||
# 使用改进的匹配算法
|
||||
page_texts = ContentAligner.find_best_match(segments, intervals)
|
||||
|
||||
# 生成最终的对齐数据
|
||||
aligned_data = []
|
||||
for idx in range(len(intervals)):
|
||||
text = " ".join([seg["text"] for seg in page_texts.get(idx, [])])
|
||||
aligned_data.append({
|
||||
"page": idx,
|
||||
"start_time": intervals[idx][0],
|
||||
"end_time": intervals[idx][1],
|
||||
"text": text
|
||||
})
|
||||
|
||||
return aligned_data
|
||||
|
||||
|
||||
# ---------------------- 摘要生成模块 ----------------------
|
||||
class SummaryGenerator:
|
||||
@staticmethod
|
||||
def optimize_text(text: str) -> str:
|
||||
"""文本浓缩优化"""
|
||||
sentences = re.split(r'[。!?]', text)
|
||||
filtered = []
|
||||
seen = set()
|
||||
for sent in sentences:
|
||||
sent = sent.strip()
|
||||
if (len(sent) >= 10
|
||||
and not any(word in sent for word in TRANSITION_WORDS)
|
||||
and sent not in seen):
|
||||
filtered.append(sent)
|
||||
seen.add(sent)
|
||||
return '。'.join(filtered) + '。' if filtered else ""
|
||||
|
||||
@staticmethod
|
||||
def generate_html(aligned_data: list, keyframes: list, output_dir: str):
|
||||
"""生成HTML报告"""
|
||||
pages_data = []
|
||||
temp_img_dir = os.path.join(output_dir, "_temp_images")
|
||||
os.makedirs(temp_img_dir, exist_ok=True)
|
||||
|
||||
try:
|
||||
for idx, frame in enumerate(keyframes):
|
||||
img_path = os.path.join(temp_img_dir, f"page_{idx}.jpg")
|
||||
frame.save(img_path)
|
||||
with open(img_path, "rb") as f:
|
||||
img_data = base64.b64encode(f.read()).decode("utf-8")
|
||||
|
||||
pages_data.append({
|
||||
"num": idx + 1,
|
||||
"time": f"{aligned_data[idx]['start_time']:.1f}s - {aligned_data[idx]['end_time']:.1f}s",
|
||||
"image": f"data:image/jpeg;base64,{img_data}",
|
||||
"text": SummaryGenerator.optimize_text(aligned_data[idx]["text"])
|
||||
})
|
||||
|
||||
env = Environment()
|
||||
template = env.from_string("""
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<title>PPT视频摘要报告</title>
|
||||
<style>
|
||||
.page { margin: 20px; padding: 15px; border: 1px solid #eee; }
|
||||
img { max-width: 800px; height: auto; }
|
||||
.timestamp { color: #666; font-size: 0.9em; }
|
||||
.content { margin-top: 10px; }
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<h1>PPT视频结构化摘要</h1>
|
||||
{% for page in pages %}
|
||||
<div class="page">
|
||||
<h2>页面 {{ page.num }}</h2>
|
||||
<div class="timestamp">{{ page.time }}</div>
|
||||
<img src="{{ page.image }}" alt="页面截图">
|
||||
<div class="content">{{ page.text }}</div>
|
||||
</div>
|
||||
{% endfor %}
|
||||
</body>
|
||||
</html>
|
||||
""")
|
||||
|
||||
output_path = os.path.join(output_dir, "summary.html")
|
||||
with open(output_path, "w", encoding="utf-8") as f:
|
||||
f.write(template.render(pages=pages_data))
|
||||
print(f"[输出] HTML报告已生成: {output_path}")
|
||||
finally:
|
||||
for f in os.listdir(temp_img_dir):
|
||||
os.remove(os.path.join(temp_img_dir, f))
|
||||
os.rmdir(temp_img_dir)
|
||||
|
||||
@staticmethod
|
||||
def generate_pdf(aligned_data: list, keyframes: list, output_dir: str):
|
||||
"""生成PDF报告(优化版)"""
|
||||
temp_html = os.path.join(output_dir, "_temp_pdf.html")
|
||||
temp_img_dir = os.path.join(output_dir, "_temp_pdf_images")
|
||||
os.makedirs(temp_img_dir, exist_ok=True)
|
||||
|
||||
try:
|
||||
# 使用绝对路径
|
||||
abs_temp_img_dir = os.path.abspath(temp_img_dir)
|
||||
|
||||
html_content = """
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<style>
|
||||
@page {
|
||||
margin: 20mm;
|
||||
size: A4;
|
||||
}
|
||||
body {
|
||||
font-family: "Microsoft YaHei", "SimSun", sans-serif;
|
||||
line-height: 1.6;
|
||||
color: #333;
|
||||
}
|
||||
.page {
|
||||
page-break-inside: avoid;
|
||||
margin-bottom: 30px;
|
||||
padding: 20px;
|
||||
border: 1px solid #eee;
|
||||
border-radius: 5px;
|
||||
}
|
||||
.page-number {
|
||||
text-align: center;
|
||||
font-size: 24pt;
|
||||
font-weight: bold;
|
||||
margin-bottom: 20px;
|
||||
color: #2c3e50;
|
||||
}
|
||||
.timestamp {
|
||||
color: #666;
|
||||
font-size: 12pt;
|
||||
margin-bottom: 15px;
|
||||
}
|
||||
.image-container {
|
||||
text-align: center;
|
||||
margin: 20px 0;
|
||||
}
|
||||
img {
|
||||
max-width: 90% !important;
|
||||
height: auto;
|
||||
display: block;
|
||||
margin: 0 auto;
|
||||
box-shadow: 0 2px 5px rgba(0,0,0,0.1);
|
||||
}
|
||||
.content {
|
||||
font-size: 14pt;
|
||||
line-height: 1.8;
|
||||
margin-top: 20px;
|
||||
padding: 15px;
|
||||
background: #f9f9f9;
|
||||
border-radius: 5px;
|
||||
}
|
||||
.professional-term {
|
||||
color: #2980b9;
|
||||
font-weight: bold;
|
||||
}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<h1 style="text-align: center; color: #2c3e50; margin-bottom: 40px;">PPT视频结构化摘要</h1>
|
||||
{% for page in pages %}
|
||||
<div class="page">
|
||||
<div class="page-number">第 {{ page.num }} 页</div>
|
||||
<div class="timestamp">时间区间:{{ page.time }}</div>
|
||||
<div class="image-container">
|
||||
<img src="{{ page.image_path }}" alt="页面截图">
|
||||
</div>
|
||||
<div class="content">{{ page.text }}</div>
|
||||
</div>
|
||||
{% endfor %}
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
|
||||
pages_data = []
|
||||
for idx, frame in enumerate(keyframes):
|
||||
img_filename = f"page_{idx}.jpg"
|
||||
img_path = os.path.join(abs_temp_img_dir, img_filename)
|
||||
frame.save(img_path)
|
||||
pages_data.append({
|
||||
"num": idx + 1,
|
||||
"time": f"{aligned_data[idx]['start_time']:.1f}s - {aligned_data[idx]['end_time']:.1f}s",
|
||||
"image_path": img_path,
|
||||
"text": SummaryGenerator.optimize_text(aligned_data[idx]["text"])
|
||||
})
|
||||
|
||||
env = Environment()
|
||||
template = env.from_string(html_content)
|
||||
with open(temp_html, "w", encoding="utf-8") as f:
|
||||
f.write(template.render(pages=pages_data))
|
||||
|
||||
# PDF生成选项
|
||||
options = {
|
||||
"enable-local-file-access": "",
|
||||
"encoding": "UTF-8",
|
||||
"margin-top": "20mm",
|
||||
"margin-bottom": "20mm",
|
||||
"margin-left": "20mm",
|
||||
"margin-right": "20mm",
|
||||
"no-stop-slow-scripts": "",
|
||||
"quiet": "",
|
||||
"dpi": "300",
|
||||
"image-quality": "100",
|
||||
"enable-smart-shrinking": "",
|
||||
"print-media-type": ""
|
||||
}
|
||||
config = pdfkit.configuration(wkhtmltopdf=WKHTMLTOPDF_PATH)
|
||||
|
||||
pdf_path = os.path.join(output_dir, "summary.pdf")
|
||||
pdfkit.from_file(
|
||||
temp_html,
|
||||
pdf_path,
|
||||
configuration=config,
|
||||
options=options
|
||||
)
|
||||
print(f"[输出] PDF报告已生成: {pdf_path}")
|
||||
|
||||
finally:
|
||||
# 清理临时文件
|
||||
if os.path.exists(temp_html):
|
||||
os.remove(temp_html)
|
||||
if os.path.exists(temp_img_dir):
|
||||
for f in os.listdir(temp_img_dir):
|
||||
os.remove(os.path.join(temp_img_dir, f))
|
||||
os.rmdir(temp_img_dir)
|
||||
|
||||
@classmethod
|
||||
def generate_all(cls, aligned_data: list, keyframes: list, output_dir: str):
|
||||
"""生成所有格式报告"""
|
||||
cls.generate_html(aligned_data, keyframes, output_dir)
|
||||
cls.generate_pdf(aligned_data, keyframes, output_dir)
|
||||
|
||||
|
||||
# ---------------------- 主流程控制 ----------------------
|
||||
def main_process():
|
||||
# 环境检查
|
||||
processor = VideoProcessor()
|
||||
if not processor.check_ffmpeg():
|
||||
return
|
||||
if not os.path.exists(VIDEO_PATH):
|
||||
print(f"[错误] 视频文件不存在: {VIDEO_PATH}")
|
||||
return
|
||||
|
||||
# 关键帧提取
|
||||
keyframes, timestamps = processor.extract_keyframes(VIDEO_PATH)
|
||||
if not keyframes:
|
||||
print("[错误] 未提取到关键帧")
|
||||
return
|
||||
|
||||
# 内容对齐
|
||||
aligned_data = ContentAligner.align_content(VIDEO_PATH, timestamps)
|
||||
if not aligned_data:
|
||||
print("[警告] 未识别到有效语音内容")
|
||||
|
||||
# 生成摘要
|
||||
os.makedirs(OUTPUT_DIR, exist_ok=True)
|
||||
SummaryGenerator.generate_all(aligned_data, keyframes, OUTPUT_DIR)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main_process()
|
BIN
6.0/summary.pdf
BIN
6.0/summary.pdf
Binary file not shown.
575
6.0/毕设.py
575
6.0/毕设.py
@ -1,575 +0,0 @@
|
||||
import os
|
||||
import re
|
||||
import base64
|
||||
import warnings
|
||||
import imageio
|
||||
import whisper
|
||||
import numpy as np
|
||||
import pdfkit
|
||||
from PIL import Image
|
||||
from skimage.metrics import structural_similarity as ssim
|
||||
from collections import defaultdict
|
||||
import subprocess
|
||||
from jinja2 import Environment
|
||||
import cv2
|
||||
from scipy.signal import find_peaks
|
||||
from skimage.feature import hog
|
||||
from skimage.color import rgb2gray
|
||||
|
||||
# ======================== 全局配置 ========================
|
||||
warnings.filterwarnings("ignore", message="FP16 is not supported on CPU; using FP32 instead")
|
||||
VIDEO_PATH = "D:/python项目文件/1/input.mp4" # 输入视频路径
|
||||
MODEL_DIR = "D:/whisper_models" # Whisper模型目录
|
||||
FFMPEG_BIN = r"D:\Program Files\ffmpeg\bin" # FFmpeg安装路径
|
||||
WKHTMLTOPDF_PATH = r"D:\wkhtmltopdf\bin\wkhtmltopdf.exe" # wkhtmltopdf路径
|
||||
SSIM_THRESHOLD = 0.85 # 关键帧去重阈值
|
||||
FRAME_INTERVAL = 2 # 抽帧间隔(秒)
|
||||
OUTPUT_DIR = "D:\桌面文件\python\output" # 输出目录
|
||||
TRANSITION_WORDS = ["接下来", "下一页", "如图"] # 过渡词过滤列
|
||||
HOG_THRESHOLD = 0.7 # HOG特征相似度阈值
|
||||
COLOR_THRESHOLD = 0.8 # 颜色直方图相似度阈值
|
||||
WHISPER_MODEL = "base" # Whisper模型大小
|
||||
PROFESSIONAL_TERMS = {
|
||||
"人工智能": "AI",
|
||||
"机器学习": "ML",
|
||||
"深度学习": "DL",
|
||||
"神经网络": "NN",
|
||||
"卷积神经网络": "CNN",
|
||||
"循环神经网络": "RNN",
|
||||
"自然语言处理": "NLP",
|
||||
"计算机视觉": "CV",
|
||||
"大数据": "Big Data",
|
||||
"云计算": "Cloud Computing"
|
||||
} # 专业术语词典
|
||||
|
||||
|
||||
# ========================================================
|
||||
|
||||
# ---------------------- 核心功能模块 ----------------------
|
||||
class VideoProcessor:
|
||||
def __init__(self):
|
||||
os.environ["PATH"] = FFMPEG_BIN + os.pathsep + os.environ["PATH"]
|
||||
|
||||
@staticmethod
|
||||
def check_ffmpeg():
|
||||
"""验证FFmpeg可用性"""
|
||||
try:
|
||||
subprocess.run(["ffmpeg", "-version"], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
print("[系统] FFmpeg验证成功")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"[错误] FFmpeg验证失败: {str(e)}")
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def calculate_color_histogram(frame):
|
||||
"""计算颜色直方图特征"""
|
||||
hist = cv2.calcHist([frame], [0, 1, 2], None, [8, 8, 8], [0, 256, 0, 256, 0, 256])
|
||||
cv2.normalize(hist, hist)
|
||||
return hist.flatten()
|
||||
|
||||
@staticmethod
|
||||
def calculate_hog_features(frame):
|
||||
"""计算HOG特征"""
|
||||
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
||||
features = hog(gray, orientations=8, pixels_per_cell=(16, 16),
|
||||
cells_per_block=(1, 1), visualize=False)
|
||||
return features
|
||||
|
||||
@staticmethod
|
||||
def is_ppt_transition(frame1, frame2):
|
||||
"""检测PPT页面切换"""
|
||||
# 转换为灰度图
|
||||
gray1 = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY)
|
||||
gray2 = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY)
|
||||
|
||||
# 计算边缘
|
||||
edges1 = cv2.Canny(gray1, 100, 200)
|
||||
edges2 = cv2.Canny(gray2, 100, 200)
|
||||
|
||||
# 计算边缘差异
|
||||
diff = cv2.absdiff(edges1, edges2)
|
||||
return np.mean(diff) > 50 # 阈值可调整
|
||||
|
||||
@staticmethod
|
||||
def extract_keyframes(video_path: str) -> tuple:
|
||||
"""提取去重关键帧及其时间戳(多特征融合)"""
|
||||
try:
|
||||
reader = imageio.get_reader(video_path)
|
||||
fps = reader.get_meta_data()["fps"]
|
||||
total_frames = reader.count_frames()
|
||||
print(f"[信息] 视频总帧数: {total_frames}")
|
||||
|
||||
keyframes = []
|
||||
timestamps = []
|
||||
prev_frame = None
|
||||
frame_count = 0
|
||||
last_progress = 0
|
||||
|
||||
for idx, frame in enumerate(reader):
|
||||
# 显示进度
|
||||
progress = int((idx / total_frames) * 100)
|
||||
if progress != last_progress and progress % 5 == 0: # 每5%显示一次进度
|
||||
print(f"[进度] 处理中: {progress}% ({idx}/{total_frames}帧)")
|
||||
last_progress = progress
|
||||
|
||||
curr_time = idx / fps
|
||||
if curr_time - (timestamps[-1] if timestamps else 0) < FRAME_INTERVAL:
|
||||
continue
|
||||
|
||||
# 检查是否为无信息帧(纯黑屏或纯白屏)
|
||||
if VideoProcessor.is_blank_frame(frame):
|
||||
continue
|
||||
|
||||
# 多特征相似度计算
|
||||
if prev_frame is not None:
|
||||
try:
|
||||
# 1. SSIM相似度(使用简化版本)
|
||||
gray_prev = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
|
||||
gray_curr = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
||||
ssim_score = ssim(gray_prev, gray_curr, win_size=3)
|
||||
|
||||
# 2. 颜色直方图相似度
|
||||
hist_prev = VideoProcessor.calculate_color_histogram(prev_frame)
|
||||
hist_curr = VideoProcessor.calculate_color_histogram(frame)
|
||||
color_sim = cv2.compareHist(hist_prev, hist_curr, cv2.HISTCMP_CORREL)
|
||||
|
||||
# 3. HOG特征相似度(仅在SSIM和颜色相似度较高时计算)
|
||||
if ssim_score > 0.8 and color_sim > 0.8:
|
||||
hog_prev = VideoProcessor.calculate_hog_features(prev_frame)
|
||||
hog_curr = VideoProcessor.calculate_hog_features(frame)
|
||||
hog_sim = np.dot(hog_prev, hog_curr) / (np.linalg.norm(hog_prev) * np.linalg.norm(hog_curr))
|
||||
else:
|
||||
hog_sim = 0 # 如果SSIM和颜色相似度低,直接跳过HOG计算
|
||||
|
||||
# 4. PPT页面切换检测
|
||||
is_transition = VideoProcessor.is_ppt_transition(prev_frame, frame)
|
||||
|
||||
# 综合判断
|
||||
if (ssim_score > SSIM_THRESHOLD and
|
||||
color_sim > COLOR_THRESHOLD and
|
||||
hog_sim > HOG_THRESHOLD and
|
||||
not is_transition):
|
||||
continue
|
||||
except Exception as e:
|
||||
print(f"[警告] 特征计算失败: {str(e)}")
|
||||
continue
|
||||
|
||||
keyframes.append(Image.fromarray(frame))
|
||||
timestamps.append(curr_time)
|
||||
prev_frame = frame
|
||||
frame_count += 1
|
||||
|
||||
# 每处理100帧强制垃圾回收
|
||||
if frame_count % 100 == 0:
|
||||
import gc
|
||||
gc.collect()
|
||||
|
||||
reader.close()
|
||||
print(f"[图像] 关键帧提取完成,共{len(keyframes)}帧")
|
||||
return keyframes, timestamps
|
||||
except Exception as e:
|
||||
print(f"[错误] 关键帧提取失败: {str(e)}")
|
||||
return [], []
|
||||
|
||||
@staticmethod
|
||||
def is_blank_frame(frame, threshold=30):
|
||||
"""检测是否为无信息帧(纯黑屏或纯白屏)"""
|
||||
try:
|
||||
# 转换为灰度图
|
||||
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
||||
|
||||
# 计算图像统计特征
|
||||
mean = np.mean(gray)
|
||||
std_dev = np.std(gray)
|
||||
|
||||
# 检查是否为纯黑或纯白
|
||||
is_black = mean < 10 and std_dev < 5
|
||||
is_white = mean > 245 and std_dev < 5
|
||||
|
||||
# 检查是否有足够的细节
|
||||
has_detail = std_dev > threshold
|
||||
|
||||
return is_black or is_white or not has_detail
|
||||
except Exception as e:
|
||||
print(f"[警告] 检查无信息帧时出错: {str(e)}")
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def transcribe_audio(video_path: str, model_name: str = WHISPER_MODEL) -> list:
|
||||
"""语音识别与时间戳获取(支持中英文混合)"""
|
||||
try:
|
||||
# 使用更大的模型提高准确率
|
||||
model = whisper.load_model(model_name, device="cpu", download_root=MODEL_DIR)
|
||||
|
||||
# 配置转写参数
|
||||
result = model.transcribe(
|
||||
video_path,
|
||||
fp16=False,
|
||||
language="zh",
|
||||
task="transcribe",
|
||||
verbose=True,
|
||||
initial_prompt="这是一段包含中英文的PPT讲解视频,可能包含专业术语。"
|
||||
)
|
||||
|
||||
segments = result.get("segments", [])
|
||||
|
||||
# 后处理:专业术语替换
|
||||
for seg in segments:
|
||||
text = seg["text"]
|
||||
for cn, en in PROFESSIONAL_TERMS.items():
|
||||
text = text.replace(cn, f"{cn}({en})")
|
||||
seg["text"] = text
|
||||
|
||||
return segments
|
||||
except Exception as e:
|
||||
print(f"[错误] 语音识别失败: {str(e)}")
|
||||
return []
|
||||
|
||||
|
||||
# ---------------------- 业务逻辑模块 ----------------------
|
||||
class ContentAligner:
|
||||
@staticmethod
|
||||
def generate_page_intervals(timestamps: list, duration: float) -> list:
|
||||
"""生成页面时间段"""
|
||||
intervals = []
|
||||
for i in range(len(timestamps)):
|
||||
start = timestamps[i]
|
||||
end = timestamps[i + 1] if i < len(timestamps) - 1 else duration
|
||||
intervals.append((start, end))
|
||||
return intervals
|
||||
|
||||
@staticmethod
|
||||
def calculate_text_similarity(text1: str, text2: str) -> float:
|
||||
"""计算文本相似度"""
|
||||
# 使用简单的词重叠度计算
|
||||
words1 = set(re.findall(r'\w+', text1.lower()))
|
||||
words2 = set(re.findall(r'\w+', text2.lower()))
|
||||
if not words1 or not words2:
|
||||
return 0.0
|
||||
intersection = words1.intersection(words2)
|
||||
union = words1.union(words2)
|
||||
return len(intersection) / len(union)
|
||||
|
||||
@staticmethod
|
||||
def find_best_match(segments: list, intervals: list) -> dict:
|
||||
"""为每个语音片段找到最佳匹配的页面"""
|
||||
page_texts = defaultdict(list)
|
||||
unmatched_segments = []
|
||||
|
||||
for seg in segments:
|
||||
seg_start = seg["start"]
|
||||
best_match = None
|
||||
best_score = 0.0
|
||||
|
||||
# 1. 首先尝试时间戳匹配
|
||||
for page_idx, (start, end) in enumerate(intervals):
|
||||
if start <= seg_start < end:
|
||||
best_match = page_idx
|
||||
break
|
||||
|
||||
# 2. 如果时间戳匹配失败,尝试文本相似度匹配
|
||||
if best_match is None:
|
||||
for page_idx, (start, end) in enumerate(intervals):
|
||||
# 获取该页面的所有文本
|
||||
page_text = " ".join([s["text"] for s in segments if start <= s["start"] < end])
|
||||
similarity = ContentAligner.calculate_text_similarity(seg["text"], page_text)
|
||||
if similarity > best_score:
|
||||
best_score = similarity
|
||||
best_match = page_idx
|
||||
|
||||
# 3. 如果找到匹配,添加到对应页面
|
||||
if best_match is not None:
|
||||
page_texts[best_match].append(seg)
|
||||
else:
|
||||
unmatched_segments.append(seg)
|
||||
|
||||
# 4. 处理未匹配的片段
|
||||
if unmatched_segments:
|
||||
print(f"[警告] 发现{len(unmatched_segments)}个未匹配的语音片段")
|
||||
# 将未匹配片段添加到最近的页面
|
||||
for seg in unmatched_segments:
|
||||
closest_page = min(range(len(intervals)),
|
||||
key=lambda i: abs(seg["start"] - (intervals[i][0] + intervals[i][1]) / 2))
|
||||
page_texts[closest_page].append(seg)
|
||||
|
||||
return page_texts
|
||||
|
||||
@staticmethod
|
||||
def align_content(video_path: str, timestamps: list) -> list:
|
||||
"""语音-画面对齐主逻辑(改进版)"""
|
||||
try:
|
||||
reader = imageio.get_reader(video_path)
|
||||
duration = reader.get_meta_data()["duration"]
|
||||
reader.close()
|
||||
except:
|
||||
duration = timestamps[-1] + FRAME_INTERVAL
|
||||
|
||||
segments = VideoProcessor.transcribe_audio(video_path)
|
||||
intervals = ContentAligner.generate_page_intervals(timestamps, duration)
|
||||
|
||||
# 使用改进的匹配算法
|
||||
page_texts = ContentAligner.find_best_match(segments, intervals)
|
||||
|
||||
# 生成最终的对齐数据
|
||||
aligned_data = []
|
||||
for idx in range(len(intervals)):
|
||||
text = " ".join([seg["text"] for seg in page_texts.get(idx, [])])
|
||||
aligned_data.append({
|
||||
"page": idx,
|
||||
"start_time": intervals[idx][0],
|
||||
"end_time": intervals[idx][1],
|
||||
"text": text
|
||||
})
|
||||
|
||||
return aligned_data
|
||||
|
||||
|
||||
# ---------------------- 摘要生成模块 ----------------------
|
||||
class SummaryGenerator:
|
||||
@staticmethod
|
||||
def optimize_text(text: str) -> str:
|
||||
"""文本浓缩优化"""
|
||||
sentences = re.split(r'[。!?]', text)
|
||||
filtered = []
|
||||
seen = set()
|
||||
for sent in sentences:
|
||||
sent = sent.strip()
|
||||
if (len(sent) >= 10
|
||||
and not any(word in sent for word in TRANSITION_WORDS)
|
||||
and sent not in seen):
|
||||
filtered.append(sent)
|
||||
seen.add(sent)
|
||||
return '。'.join(filtered) + '。' if filtered else ""
|
||||
|
||||
@staticmethod
|
||||
def generate_html(aligned_data: list, keyframes: list, output_dir: str):
|
||||
"""生成HTML报告"""
|
||||
pages_data = []
|
||||
temp_img_dir = os.path.join(output_dir, "_temp_images")
|
||||
os.makedirs(temp_img_dir, exist_ok=True)
|
||||
|
||||
try:
|
||||
for idx, frame in enumerate(keyframes):
|
||||
img_path = os.path.join(temp_img_dir, f"page_{idx}.jpg")
|
||||
frame.save(img_path)
|
||||
with open(img_path, "rb") as f:
|
||||
img_data = base64.b64encode(f.read()).decode("utf-8")
|
||||
|
||||
pages_data.append({
|
||||
"num": idx + 1,
|
||||
"time": f"{aligned_data[idx]['start_time']:.1f}s - {aligned_data[idx]['end_time']:.1f}s",
|
||||
"image": f"data:image/jpeg;base64,{img_data}",
|
||||
"text": SummaryGenerator.optimize_text(aligned_data[idx]["text"])
|
||||
})
|
||||
|
||||
env = Environment()
|
||||
template = env.from_string("""
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<title>PPT视频摘要报告</title>
|
||||
<style>
|
||||
.page { margin: 20px; padding: 15px; border: 1px solid #eee; }
|
||||
img { max-width: 800px; height: auto; }
|
||||
.timestamp { color: #666; font-size: 0.9em; }
|
||||
.content { margin-top: 10px; }
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<h1>PPT视频结构化摘要</h1>
|
||||
{% for page in pages %}
|
||||
<div class="page">
|
||||
<h2>页面 {{ page.num }}</h2>
|
||||
<div class="timestamp">{{ page.time }}</div>
|
||||
<img src="{{ page.image }}" alt="页面截图">
|
||||
<div class="content">{{ page.text }}</div>
|
||||
</div>
|
||||
{% endfor %}
|
||||
</body>
|
||||
</html>
|
||||
""")
|
||||
|
||||
output_path = os.path.join(output_dir, "summary.html")
|
||||
with open(output_path, "w", encoding="utf-8") as f:
|
||||
f.write(template.render(pages=pages_data))
|
||||
print(f"[输出] HTML报告已生成: {output_path}")
|
||||
finally:
|
||||
for f in os.listdir(temp_img_dir):
|
||||
os.remove(os.path.join(temp_img_dir, f))
|
||||
os.rmdir(temp_img_dir)
|
||||
|
||||
@staticmethod
|
||||
def generate_pdf(aligned_data: list, keyframes: list, output_dir: str):
|
||||
"""生成PDF报告(优化版)"""
|
||||
temp_html = os.path.join(output_dir, "_temp_pdf.html")
|
||||
temp_img_dir = os.path.join(output_dir, "_temp_pdf_images")
|
||||
os.makedirs(temp_img_dir, exist_ok=True)
|
||||
|
||||
try:
|
||||
# 使用绝对路径
|
||||
abs_temp_img_dir = os.path.abspath(temp_img_dir)
|
||||
|
||||
html_content = """
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<style>
|
||||
@page {
|
||||
margin: 20mm;
|
||||
size: A4;
|
||||
}
|
||||
body {
|
||||
font-family: "Microsoft YaHei", "SimSun", sans-serif;
|
||||
line-height: 1.6;
|
||||
color: #333;
|
||||
}
|
||||
.page {
|
||||
page-break-inside: avoid;
|
||||
margin-bottom: 30px;
|
||||
padding: 20px;
|
||||
border: 1px solid #eee;
|
||||
border-radius: 5px;
|
||||
}
|
||||
.page-number {
|
||||
text-align: center;
|
||||
font-size: 24pt;
|
||||
font-weight: bold;
|
||||
margin-bottom: 20px;
|
||||
color: #2c3e50;
|
||||
}
|
||||
.timestamp {
|
||||
color: #666;
|
||||
font-size: 12pt;
|
||||
margin-bottom: 15px;
|
||||
}
|
||||
.image-container {
|
||||
text-align: center;
|
||||
margin: 20px 0;
|
||||
}
|
||||
img {
|
||||
max-width: 90% !important;
|
||||
height: auto;
|
||||
display: block;
|
||||
margin: 0 auto;
|
||||
box-shadow: 0 2px 5px rgba(0,0,0,0.1);
|
||||
}
|
||||
.content {
|
||||
font-size: 14pt;
|
||||
line-height: 1.8;
|
||||
margin-top: 20px;
|
||||
padding: 15px;
|
||||
background: #f9f9f9;
|
||||
border-radius: 5px;
|
||||
}
|
||||
.professional-term {
|
||||
color: #2980b9;
|
||||
font-weight: bold;
|
||||
}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<h1 style="text-align: center; color: #2c3e50; margin-bottom: 40px;">PPT视频结构化摘要</h1>
|
||||
{% for page in pages %}
|
||||
<div class="page">
|
||||
<div class="page-number">第 {{ page.num }} 页</div>
|
||||
<div class="timestamp">时间区间:{{ page.time }}</div>
|
||||
<div class="image-container">
|
||||
<img src="{{ page.image_path }}" alt="页面截图">
|
||||
</div>
|
||||
<div class="content">{{ page.text }}</div>
|
||||
</div>
|
||||
{% endfor %}
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
|
||||
pages_data = []
|
||||
for idx, frame in enumerate(keyframes):
|
||||
img_filename = f"page_{idx}.jpg"
|
||||
img_path = os.path.join(abs_temp_img_dir, img_filename)
|
||||
frame.save(img_path)
|
||||
pages_data.append({
|
||||
"num": idx + 1,
|
||||
"time": f"{aligned_data[idx]['start_time']:.1f}s - {aligned_data[idx]['end_time']:.1f}s",
|
||||
"image_path": img_path,
|
||||
"text": SummaryGenerator.optimize_text(aligned_data[idx]["text"])
|
||||
})
|
||||
|
||||
env = Environment()
|
||||
template = env.from_string(html_content)
|
||||
with open(temp_html, "w", encoding="utf-8") as f:
|
||||
f.write(template.render(pages=pages_data))
|
||||
|
||||
# PDF生成选项
|
||||
options = {
|
||||
"enable-local-file-access": "",
|
||||
"encoding": "UTF-8",
|
||||
"margin-top": "20mm",
|
||||
"margin-bottom": "20mm",
|
||||
"margin-left": "20mm",
|
||||
"margin-right": "20mm",
|
||||
"no-stop-slow-scripts": "",
|
||||
"quiet": "",
|
||||
"dpi": "300",
|
||||
"image-quality": "100",
|
||||
"enable-smart-shrinking": "",
|
||||
"print-media-type": ""
|
||||
}
|
||||
config = pdfkit.configuration(wkhtmltopdf=WKHTMLTOPDF_PATH)
|
||||
|
||||
pdf_path = os.path.join(output_dir, "summary.pdf")
|
||||
pdfkit.from_file(
|
||||
temp_html,
|
||||
pdf_path,
|
||||
configuration=config,
|
||||
options=options
|
||||
)
|
||||
print(f"[输出] PDF报告已生成: {pdf_path}")
|
||||
|
||||
finally:
|
||||
# 清理临时文件
|
||||
if os.path.exists(temp_html):
|
||||
os.remove(temp_html)
|
||||
if os.path.exists(temp_img_dir):
|
||||
for f in os.listdir(temp_img_dir):
|
||||
os.remove(os.path.join(temp_img_dir, f))
|
||||
os.rmdir(temp_img_dir)
|
||||
|
||||
@classmethod
|
||||
def generate_all(cls, aligned_data: list, keyframes: list, output_dir: str):
|
||||
"""生成所有格式报告"""
|
||||
cls.generate_html(aligned_data, keyframes, output_dir)
|
||||
cls.generate_pdf(aligned_data, keyframes, output_dir)
|
||||
|
||||
|
||||
# ---------------------- 主流程控制 ----------------------
|
||||
def main_process():
|
||||
# 环境检查
|
||||
processor = VideoProcessor()
|
||||
if not processor.check_ffmpeg():
|
||||
return
|
||||
if not os.path.exists(VIDEO_PATH):
|
||||
print(f"[错误] 视频文件不存在: {VIDEO_PATH}")
|
||||
return
|
||||
|
||||
# 关键帧提取
|
||||
keyframes, timestamps = processor.extract_keyframes(VIDEO_PATH)
|
||||
if not keyframes:
|
||||
print("[错误] 未提取到关键帧")
|
||||
return
|
||||
|
||||
# 内容对齐
|
||||
aligned_data = ContentAligner.align_content(VIDEO_PATH, timestamps)
|
||||
if not aligned_data:
|
||||
print("[警告] 未识别到有效语音内容")
|
||||
|
||||
# 生成摘要
|
||||
os.makedirs(OUTPUT_DIR, exist_ok=True)
|
||||
SummaryGenerator.generate_all(aligned_data, keyframes, OUTPUT_DIR)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main_process()
|
BIN
7.0/summary.pdf
BIN
7.0/summary.pdf
Binary file not shown.
@ -1,44 +0,0 @@
|
||||
# 1. 选择基础镜像 (推荐使用具体的版本号)
|
||||
FROM python:3.10
|
||||
|
||||
# 2. 设置工作目录
|
||||
WORKDIR /app
|
||||
|
||||
# 3. 更新apt包列表并安装系统依赖
|
||||
# - build-essential: 用于编译一些Python包可能需要的C/C++代码
|
||||
# - ffmpeg: 被 moviepy 和 imageio-ffmpeg 需要
|
||||
# - libgl1-mesa-glx, libglib2.0-0: opencv-python 可能需要的运行时库
|
||||
# - wkhtmltopdf: pdfkit 需要的工具
|
||||
# --no-install-recommends 减少不必要的包安装
|
||||
# 最后清理 apt 缓存以减小镜像体积
|
||||
RUN apt-get update && \
|
||||
apt-get install -y --no-install-recommends \
|
||||
build-essential \
|
||||
ffmpeg \
|
||||
libgl1-mesa-glx \
|
||||
libglib2.0-0 \
|
||||
wkhtmltopdf \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# 4. (推荐) 将你的 requirements.txt 文件复制到镜像中
|
||||
# 先复制 requirements.txt 并安装依赖,可以利用 Docker 的层缓存机制
|
||||
# 只有当 requirements.txt 改变时,这一层及之后的层才会重新构建
|
||||
COPY requirements.txt .
|
||||
|
||||
# 5. 安装 Python 依赖
|
||||
# --no-cache-dir 减少镜像体积
|
||||
# -r requirements.txt 从文件安装
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
|
||||
# 6. 复制你的 Flask 应用代码到镜像中
|
||||
COPY . .
|
||||
|
||||
# 7. 声明你的 Flask 应用监听的端口 (默认是 5000)
|
||||
EXPOSE 5000
|
||||
|
||||
# 8. 定义容器启动时运行的命令
|
||||
# 使用 Gunicorn 或 uWSGI 在生产环境中通常更好,但对于开发,flask run 也可以
|
||||
# 确保 Flask 监听 0.0.0.0 以便从容器外部访问
|
||||
CMD ["flask", "run", "--host=0.0.0.0"]
|
||||
# 或者如果你的启动文件是 app.py:
|
||||
# CMD ["python", "app.py"]
|
@ -1,14 +0,0 @@
|
||||
numpy>=1.21.0
|
||||
opencv-python>=4.5.3
|
||||
Pillow>=8.3.1
|
||||
imageio>=2.9.0
|
||||
imageio-ffmpeg>=0.4.5
|
||||
scikit-image>=0.18.3
|
||||
scipy>=1.7.1
|
||||
openai-whisper>=20231117
|
||||
pdfkit>=1.0.0
|
||||
Jinja2>=3.0.1
|
||||
moviepy>=1.0.3
|
||||
reportlab>=3.6.8
|
||||
torch>=1.9.0
|
||||
tqdm>=4.62.3
|
@ -1,7 +0,0 @@
|
||||
@echo off
|
||||
echo 正在启动视频批量处理系统...
|
||||
start "" python server.py
|
||||
timeout /t 3
|
||||
start http://localhost:5000
|
||||
echo 服务已启动,如果浏览器没有自动打开,请手动访问 http://localhost:5000
|
||||
pause
|
Binary file not shown.
BIN
视频处理系统.zip
BIN
视频处理系统.zip
Binary file not shown.
Loading…
Reference in New Issue
Block a user