Compare commits

..

No commits in common. "main" and "2" have entirely different histories.
main ... 2

16 changed files with 0 additions and 3378 deletions

Binary file not shown.

Binary file not shown.

View File

@ -1,576 +0,0 @@
import cv2
import numpy as np
from skimage.metrics import structural_similarity as ssim
from moviepy.editor import VideoFileClip
from PIL import Image
import os
from scipy.signal import find_peaks
import torch
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import whisper
from collections import defaultdict
import re
from reportlab.lib.pagesizes import A4
from reportlab.pdfgen import canvas
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.platypus import Paragraph, Image as RLImage
from reportlab.lib.units import inch
import threading
import pdfkit
from jinja2 import Environment
import io
import base64
# 全局配置
SSIM_THRESHOLD = 0.85 # 关键帧去重阈值
FRAME_INTERVAL = 2 # 抽帧间隔(秒)
TRANSITION_WORDS = ["接下来", "下一页", "如图"] # 过渡词过滤列表
WKHTMLTOPDF_PATH = r"D:\wkhtmltopdf\bin\wkhtmltopdf.exe"
class PPTSummarizer:
def __init__(self, video_path, output_dir=None):
self.video_path = video_path
# 如果没有指定输出目录,则使用默认目录
if output_dir is None:
# 使用视频文件名作为输出目录名
video_name = os.path.splitext(os.path.basename(video_path))[0]
self.output_dir = os.path.join("output", video_name)
else:
self.output_dir = output_dir
self.frames = []
self.key_frames = []
self.text_content = []
self.frame_timestamps = []
self.aligned_data = []
self.processing_complete = threading.Event()
# 创建输出目录
if not os.path.exists(self.output_dir):
os.makedirs(self.output_dir)
# 初始化whisper模型
self.whisper_model = whisper.load_model("tiny", device="cpu")
def extract_frames(self):
"""提取视频帧"""
try:
# 使用VideoFileClip替代cv2
video = VideoFileClip(self.video_path)
duration = video.duration
fps = video.fps
# 计算采样间隔
sample_interval = max(1 / fps, FRAME_INTERVAL)
timestamps = np.arange(0, duration, sample_interval)
print(f"开始提取帧,视频时长:{duration:.2f}FPS{fps}")
# 提取帧
for t in timestamps:
try:
frame = video.get_frame(t)
# 转换为BGR格式OpenCV格式
frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
self.frames.append(frame_bgr)
self.frame_timestamps.append(t)
except Exception as e:
print(f"提取帧 {t}s 时出错: {str(e)}")
continue
video.close()
print(f"成功提取 {len(self.frames)}")
return fps
except Exception as e:
print(f"视频处理出错: {str(e)}")
return 0
def process_audio(self):
"""处理音频"""
try:
print("开始语音识别...")
# 使用更大的模型以提高识别准确度
result = self.whisper_model.transcribe(
self.video_path,
fp16=False,
language="zh",
task="transcribe",
verbose=True
)
segments = result.get("segments", [])
print(f"语音识别完成,共识别出 {len(segments)} 个片段")
# 打印识别结果
for i, seg in enumerate(segments):
try:
print(f"片段 {i + 1}: {seg['start']:.1f}s - {seg['end']:.1f}s: {seg['text']}")
except UnicodeEncodeError:
print(f"片段 {i + 1}: {seg['start']:.1f}s - {seg['end']:.1f}s: [文本包含特殊字符]")
# 生成页面时间段
intervals = []
for i in range(len(self.frame_timestamps)):
start = self.frame_timestamps[i]
end = self.frame_timestamps[i + 1] if i < len(self.frame_timestamps) - 1 else self.frame_timestamps[
-1] + 1
intervals.append((start, end))
print(f"页面 {i + 1} 时间段: {start:.1f}s - {end:.1f}s")
# 改进的对齐逻辑
page_texts = defaultdict(list)
page_segments = defaultdict(list) # 存储每个页面对应的片段
all_texts = [] # 存储所有文本片段
used_texts = set() # 跟踪已使用的文本
# 第一步:收集所有文本片段
for seg in segments:
try:
seg_start = seg["start"]
seg_end = seg["end"]
seg_text = seg["text"].strip()
all_texts.append((seg_start, seg_end, seg_text))
except Exception as e:
print(f"处理片段时出错: {str(e)}")
continue
# 第二步:将文本分配到各个页面
for start, end, text in all_texts:
try:
# 找到与当前片段时间重叠的所有页面
overlapping_pages = []
for page_idx, (page_start, page_end) in enumerate(intervals):
if (start <= page_end and end >= page_start):
overlapping_pages.append((page_idx, page_start, page_end))
# 如果找到重叠页面,将文本添加到最合适的页面
if overlapping_pages:
# 计算每个页面的重叠时间
page_overlaps = []
for page_idx, page_start, page_end in overlapping_pages:
overlap_start = max(start, page_start)
overlap_end = min(end, page_end)
overlap_duration = overlap_end - overlap_start
page_overlaps.append((page_idx, overlap_duration))
# 按重叠时间排序
page_overlaps.sort(key=lambda x: x[1], reverse=True)
# 将文本添加到重叠时间最长的页面
best_page = page_overlaps[0][0]
if text not in used_texts: # 确保文本未被使用
page_texts[best_page].append(text)
page_segments[best_page].append((start, end, text))
used_texts.add(text)
print(f"将文本 '{text}' 添加到页面 {best_page + 1}")
except Exception as e:
print(f"分配文本时出错: {str(e)}")
continue
# 第三步:优化每个页面的文本
self.aligned_data = []
for idx in range(len(intervals)):
try:
# 获取当前页面的所有片段
segments = page_segments[idx]
# 按时间排序
segments.sort(key=lambda x: x[0])
# 合并相邻的相似文本
merged_texts = []
current_text = ""
last_end_time = 0
for start, end, text in segments:
# 如果当前文本为空,直接添加
if not current_text:
current_text = text
last_end_time = end
continue
# 计算时间间隔
time_gap = start - last_end_time
# 如果时间间隔小于3秒合并文本
if time_gap < 3.0: # 增加时间间隔阈值
current_text += " " + text
else:
merged_texts.append(current_text)
current_text = text
last_end_time = end
# 添加最后一个文本
if current_text:
merged_texts.append(current_text)
# 合并所有文本
final_text = " ".join(merged_texts)
# 如果当前页面文本为空,尝试从前一页面获取
if not final_text and idx > 0:
final_text = self.aligned_data[idx - 1]["text"]
# 优化文本
optimized_text = self.optimize_text(final_text)
if optimized_text:
print(f"页面 {idx + 1} 的优化后文本内容: {optimized_text}")
self.aligned_data.append({
"page": idx,
"start_time": intervals[idx][0],
"end_time": intervals[idx][1],
"text": optimized_text
})
except Exception as e:
print(f"处理页面 {idx + 1} 时出错: {str(e)}")
# 添加空数据
self.aligned_data.append({
"page": idx,
"start_time": intervals[idx][0],
"end_time": intervals[idx][1],
"text": ""
})
# 第四步:确保所有文本都被包含
# 检查是否有遗漏的文本片段
for start, end, text in all_texts:
try:
if text not in used_texts:
# 找到最后一个非空页面
last_non_empty_page = -1
for i in range(len(self.aligned_data) - 1, -1, -1):
if self.aligned_data[i]["text"]:
last_non_empty_page = i
break
if last_non_empty_page >= 0:
self.aligned_data[last_non_empty_page]["text"] += " " + text
print(f"将遗漏的文本 '{text}' 添加到页面 {last_non_empty_page + 1}")
except Exception as e:
print(f"处理遗漏文本时出错: {str(e)}")
continue
# 保存对齐数据到文件
try:
with open(os.path.join(self.output_dir, 'aligned_data.txt'), 'w', encoding='utf-8') as f:
for data in self.aligned_data:
f.write(f"页面 {data['page'] + 1}:\n")
f.write(f"时间: {data['start_time']:.1f}s - {data['end_time']:.1f}s\n")
f.write(f"文本: {data['text']}\n\n")
print("对齐数据已保存到文件")
except Exception as e:
print(f"保存对齐数据时出错: {str(e)}")
except Exception as e:
print(f"音频处理出错: {str(e)}")
self.aligned_data = []
def process_frames(self):
"""处理视频帧"""
try:
print("开始处理视频帧...")
# 计算帧间相似度
similarities = []
for i in range(len(self.frames) - 1):
try:
frame1 = cv2.cvtColor(self.frames[i], cv2.COLOR_BGR2GRAY)
frame2 = cv2.cvtColor(self.frames[i + 1], cv2.COLOR_BGR2GRAY)
similarity = ssim(frame1, frame2)
similarities.append(similarity)
except Exception as e:
print(f"计算帧 {i} 相似度时出错: {str(e)}")
similarities.append(1.0) # 出错时假设帧相似
# 使用自适应阈值
mean_similarity = np.mean(similarities)
std_similarity = np.std(similarities)
adaptive_threshold = mean_similarity - 2 * std_similarity
# 使用峰值检测找到关键帧
peaks, _ = find_peaks([1 - s for s in similarities],
height=1 - adaptive_threshold,
distance=int(len(similarities) / 20)) # 最小距离
# 保存关键帧
for peak in peaks:
if not self.is_blank_frame(self.frames[peak]):
self.key_frames.append(self.frames[peak])
print(f"找到 {len(self.key_frames)} 个关键帧")
except Exception as e:
print(f"处理视频帧时出错: {str(e)}")
def is_blank_frame(self, frame, threshold=30):
"""检测是否为空白帧"""
try:
# 转换为灰度图
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# 计算图像统计特征
mean = np.mean(gray)
std_dev = np.std(gray)
# 检查是否为纯黑或纯白
is_black = mean < 10 and std_dev < 5
is_white = mean > 245 and std_dev < 5
# 检查是否有足够的细节
has_detail = std_dev > threshold
return is_black or is_white or not has_detail
except Exception as e:
print(f"检查空白帧时出错: {str(e)}")
return True
def optimize_text(self, text):
"""文本优化"""
try:
if not text:
return ""
# 过滤过渡词
sentences = re.split(r'[。!?]', text)
filtered = []
seen = set()
for sent in sentences:
sent = sent.strip()
if (len(sent) >= 10
and not any(word in sent for word in TRANSITION_WORDS)
and sent not in seen):
filtered.append(sent)
seen.add(sent)
result = ''.join(filtered) + '' if filtered else ""
if result:
print(f"优化后的文本: {result}")
return result
except Exception as e:
print(f"文本优化时出错: {str(e)}")
return text
def save_results(self):
"""保存结果"""
try:
# 检查输出目录权限
if not os.access(self.output_dir, os.W_OK):
print(f"错误:没有写入权限: {self.output_dir}")
return
# 生成PDF文档
pdf_path = os.path.join(self.output_dir, 'summary.pdf')
# 创建临时HTML文件
temp_html = os.path.join(self.output_dir, "_temp_pdf.html")
temp_img_dir = os.path.join(self.output_dir, "_temp_pdf_images")
os.makedirs(temp_img_dir, exist_ok=True)
try:
# 使用绝对路径
abs_temp_img_dir = os.path.abspath(temp_img_dir)
html_content = """
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<style>
@page {
margin: 20px;
size: A4;
}
body {
font-family: "Microsoft YaHei", "SimSun", sans-serif;
line-height: 1.6;
margin: 0;
padding: 20px;
}
.page {
page-break-inside: avoid;
margin-bottom: 30px;
padding: 20px;
background-color: white;
}
img {
max-width: 100%;
height: auto;
display: block;
margin: 10px auto;
}
.timestamp {
color: #666;
font-size: 12pt;
margin: 10px 0;
}
.content {
font-size: 14pt;
line-height: 1.6;
margin: 15px 0;
}
h1 {
text-align: center;
color: #333;
margin-bottom: 30px;
}
h2 {
color: #444;
margin: 15px 0;
}
</style>
</head>
<body>
<h1>PPT视频结构化摘要</h1>
{% for page in pages %}
<div class="page">
<h2>页面 {{ page.num }}</h2>
<div class="timestamp">{{ page.time }}</div>
<img src="{{ page.image_path }}" alt="页面截图">
<div class="content">{{ page.text }}</div>
</div>
{% endfor %}
</body>
</html>
"""
pages_data = []
for idx, frame in enumerate(self.key_frames):
try:
img_filename = f"page_{idx}.jpg"
img_path = os.path.join(abs_temp_img_dir, img_filename)
# 将numpy数组转换为PIL Image对象
if len(frame.shape) == 3 and frame.shape[2] == 3:
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
else:
frame_rgb = frame
# 创建PIL Image对象
img = Image.fromarray(frame_rgb)
# 调整图片大小
max_width = 800
if img.width > max_width:
ratio = max_width / img.width
new_height = int(img.height * ratio)
img = img.resize((max_width, new_height), Image.Resampling.LANCZOS)
# 保存图片
img.save(img_path, format='JPEG', quality=85, optimize=True)
# 获取从开始到当前帧的所有文本
current_time = self.aligned_data[idx]['end_time']
# 收集从开始到当前时间点的所有文本
texts = []
for data in self.aligned_data:
if data['end_time'] <= current_time:
if data['text']:
texts.append(data['text'])
# 合并文本
combined_text = " ".join(texts)
# 如果只有一帧关键帧,显示整个视频的所有文本
if len(self.key_frames) == 1:
all_texts = []
for data in self.aligned_data:
if data['text']:
all_texts.append(data['text'])
combined_text = " ".join(all_texts)
# 添加file://前缀到图片路径
img_path_with_prefix = f"file:///{img_path.replace(os.sep, '/')}"
pages_data.append({
"num": idx + 1,
"time": f"{self.aligned_data[idx]['start_time']:.1f}s - {self.aligned_data[idx]['end_time']:.1f}s",
"image_path": img_path_with_prefix,
"text": combined_text
})
except Exception as e:
print(f"处理页面 {idx + 1} 时出错: {str(e)}")
continue
# 生成HTML文件
env = Environment()
template = env.from_string(html_content)
with open(temp_html, "w", encoding="utf-8") as f:
f.write(template.render(pages=pages_data))
# 设置PDF生成选项
options = {
"enable-local-file-access": "",
"encoding": "UTF-8",
"margin-top": "15mm",
"margin-bottom": "15mm",
"margin-left": "15mm",
"margin-right": "15mm",
"quiet": "",
"print-media-type": "",
"page-size": "A4",
"orientation": "Portrait"
}
config = pdfkit.configuration(wkhtmltopdf=WKHTMLTOPDF_PATH)
# 生成PDF
pdfkit.from_file(
temp_html,
pdf_path,
configuration=config,
options=options
)
print(f"PDF已保存到: {pdf_path}")
finally:
# 清理临时文件
if os.path.exists(temp_html):
os.remove(temp_html)
if os.path.exists(temp_img_dir):
for f in os.listdir(temp_img_dir):
os.remove(os.path.join(temp_img_dir, f))
os.rmdir(temp_img_dir)
except Exception as e:
print(f"保存结果时出错: {str(e)}")
def process(self):
"""处理视频并生成摘要"""
try:
start_time = time.time()
print("开始处理视频...")
# 提取视频帧
self.extract_frames()
# 创建线程池
with ThreadPoolExecutor(max_workers=2) as executor:
# 提交音频处理和帧处理任务
audio_future = executor.submit(self.process_audio)
frames_future = executor.submit(self.process_frames)
# 等待两个任务完成
audio_future.result()
frames_future.result()
print("生成摘要...")
self.save_results()
end_time = time.time()
print(f"处理完成!总耗时:{end_time - start_time:.2f}")
except Exception as e:
print(f"处理过程出错: {str(e)}")
raise # 重新抛出异常以便调试
if __name__ == "__main__":
# 使用示例
video_path = "D:/python项目文件/1/input3.mp4" # 替换为实际的视频路径
output_dir = "custom_output" # 自定义输出路径
summarizer = PPTSummarizer(video_path, output_dir)
summarizer.process()

Binary file not shown.

View File

@ -1,525 +0,0 @@
import os
import re
import base64
import warnings
import imageio
import whisper
import numpy as np
import pdfkit
from PIL import Image
from skimage.metrics import structural_similarity as ssim
from collections import defaultdict
import subprocess
from jinja2 import Environment
import cv2
from scipy.signal import find_peaks
from skimage.feature import hog
from skimage.color import rgb2gray
# ======================== 全局配置 ========================
warnings.filterwarnings("ignore", message="FP16 is not supported on CPU; using FP32 instead")
VIDEO_PATH = "D:/python项目文件/1/input.mp4" # 输入视频路径
MODEL_DIR = "D:/whisper_models" # Whisper模型目录
FFMPEG_BIN = r"D:\Program Files\ffmpeg\bin" # FFmpeg安装路径
WKHTMLTOPDF_PATH = r"D:\wkhtmltopdf\bin\wkhtmltopdf.exe" # wkhtmltopdf路径
SSIM_THRESHOLD = 0.85 # 关键帧去重阈值
FRAME_INTERVAL = 2 # 抽帧间隔(秒)
OUTPUT_DIR = "D:\桌面文件\python\output" # 输出目录
TRANSITION_WORDS = ["接下来", "下一页", "如图"] # 过渡词过滤列
HOG_THRESHOLD = 0.7 # HOG特征相似度阈值
COLOR_THRESHOLD = 0.8 # 颜色直方图相似度阈值
WHISPER_MODEL = "base" # Whisper模型大小
PROFESSIONAL_TERMS = {
"人工智能": "AI",
"机器学习": "ML",
"深度学习": "DL",
"神经网络": "NN",
"卷积神经网络": "CNN",
"循环神经网络": "RNN",
"自然语言处理": "NLP",
"计算机视觉": "CV",
"大数据": "Big Data",
"云计算": "Cloud Computing"
} # 专业术语词典
# ========================================================
# ---------------------- 核心功能模块 ----------------------
class VideoProcessor:
def __init__(self):
os.environ["PATH"] = FFMPEG_BIN + os.pathsep + os.environ["PATH"]
@staticmethod
def check_ffmpeg():
"""验证FFmpeg可用性"""
try:
subprocess.run(["ffmpeg", "-version"], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print("[系统] FFmpeg验证成功")
return True
except Exception as e:
print(f"[错误] FFmpeg验证失败: {str(e)}")
return False
@staticmethod
def calculate_color_histogram(frame):
"""计算颜色直方图特征"""
hist = cv2.calcHist([frame], [0, 1, 2], None, [8, 8, 8], [0, 256, 0, 256, 0, 256])
cv2.normalize(hist, hist)
return hist.flatten()
@staticmethod
def calculate_hog_features(frame):
"""计算HOG特征"""
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
features = hog(gray, orientations=8, pixels_per_cell=(16, 16),
cells_per_block=(1, 1), visualize=False)
return features
@staticmethod
def is_ppt_transition(frame1, frame2):
"""检测PPT页面切换"""
# 转换为灰度图
gray1 = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY)
gray2 = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY)
# 计算边缘
edges1 = cv2.Canny(gray1, 100, 200)
edges2 = cv2.Canny(gray2, 100, 200)
# 计算边缘差异
diff = cv2.absdiff(edges1, edges2)
return np.mean(diff) > 50 # 阈值可调整
@staticmethod
def extract_keyframes(video_path: str) -> tuple:
"""提取去重关键帧及其时间戳(多特征融合)"""
try:
reader = imageio.get_reader(video_path)
fps = reader.get_meta_data()["fps"]
keyframes = []
timestamps = []
prev_frame = None
prev_features = None
for idx, frame in enumerate(reader):
curr_time = idx / fps
if curr_time - (timestamps[-1] if timestamps else 0) < FRAME_INTERVAL:
continue
# 多特征相似度计算
if prev_frame is not None:
# 1. SSIM相似度
gray_prev = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
gray_curr = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
ssim_score = ssim(gray_prev, gray_curr)
# 2. 颜色直方图相似度
hist_prev = VideoProcessor.calculate_color_histogram(prev_frame)
hist_curr = VideoProcessor.calculate_color_histogram(frame)
color_sim = cv2.compareHist(hist_prev, hist_curr, cv2.HISTCMP_CORREL)
# 3. HOG特征相似度
hog_prev = VideoProcessor.calculate_hog_features(prev_frame)
hog_curr = VideoProcessor.calculate_hog_features(frame)
hog_sim = np.dot(hog_prev, hog_curr) / (np.linalg.norm(hog_prev) * np.linalg.norm(hog_curr))
# 4. PPT页面切换检测
is_transition = VideoProcessor.is_ppt_transition(prev_frame, frame)
# 综合判断
if (ssim_score > SSIM_THRESHOLD and
color_sim > COLOR_THRESHOLD and
hog_sim > HOG_THRESHOLD and
not is_transition):
continue
keyframes.append(Image.fromarray(frame))
timestamps.append(curr_time)
prev_frame = frame
reader.close()
print(f"[图像] 关键帧提取完成,共{len(keyframes)}")
return keyframes, timestamps
except Exception as e:
print(f"[错误] 关键帧提取失败: {str(e)}")
return [], []
@staticmethod
def transcribe_audio(video_path: str, model_name: str = WHISPER_MODEL) -> list:
"""语音识别与时间戳获取(支持中英文混合)"""
try:
# 使用更大的模型提高准确率
model = whisper.load_model(model_name, device="cpu", download_root=MODEL_DIR)
# 配置转写参数
result = model.transcribe(
video_path,
fp16=False,
language="zh",
task="transcribe",
verbose=True,
initial_prompt="这是一段包含中英文的PPT讲解视频可能包含专业术语。"
)
segments = result.get("segments", [])
# 后处理:专业术语替换
for seg in segments:
text = seg["text"]
for cn, en in PROFESSIONAL_TERMS.items():
text = text.replace(cn, f"{cn}({en})")
seg["text"] = text
return segments
except Exception as e:
print(f"[错误] 语音识别失败: {str(e)}")
return []
# ---------------------- 业务逻辑模块 ----------------------
class ContentAligner:
@staticmethod
def generate_page_intervals(timestamps: list, duration: float) -> list:
"""生成页面时间段"""
intervals = []
for i in range(len(timestamps)):
start = timestamps[i]
end = timestamps[i + 1] if i < len(timestamps) - 1 else duration
intervals.append((start, end))
return intervals
@staticmethod
def calculate_text_similarity(text1: str, text2: str) -> float:
"""计算文本相似度"""
# 使用简单的词重叠度计算
words1 = set(re.findall(r'\w+', text1.lower()))
words2 = set(re.findall(r'\w+', text2.lower()))
if not words1 or not words2:
return 0.0
intersection = words1.intersection(words2)
union = words1.union(words2)
return len(intersection) / len(union)
@staticmethod
def find_best_match(segments: list, intervals: list) -> dict:
"""为每个语音片段找到最佳匹配的页面"""
page_texts = defaultdict(list)
unmatched_segments = []
for seg in segments:
seg_start = seg["start"]
best_match = None
best_score = 0.0
# 1. 首先尝试时间戳匹配
for page_idx, (start, end) in enumerate(intervals):
if start <= seg_start < end:
best_match = page_idx
break
# 2. 如果时间戳匹配失败,尝试文本相似度匹配
if best_match is None:
for page_idx, (start, end) in enumerate(intervals):
# 获取该页面的所有文本
page_text = " ".join([s["text"] for s in segments if start <= s["start"] < end])
similarity = ContentAligner.calculate_text_similarity(seg["text"], page_text)
if similarity > best_score:
best_score = similarity
best_match = page_idx
# 3. 如果找到匹配,添加到对应页面
if best_match is not None:
page_texts[best_match].append(seg)
else:
unmatched_segments.append(seg)
# 4. 处理未匹配的片段
if unmatched_segments:
print(f"[警告] 发现{len(unmatched_segments)}个未匹配的语音片段")
# 将未匹配片段添加到最近的页面
for seg in unmatched_segments:
closest_page = min(range(len(intervals)),
key=lambda i: abs(seg["start"] - (intervals[i][0] + intervals[i][1]) / 2))
page_texts[closest_page].append(seg)
return page_texts
@staticmethod
def align_content(video_path: str, timestamps: list) -> list:
"""语音-画面对齐主逻辑(改进版)"""
try:
reader = imageio.get_reader(video_path)
duration = reader.get_meta_data()["duration"]
reader.close()
except:
duration = timestamps[-1] + FRAME_INTERVAL
segments = VideoProcessor.transcribe_audio(video_path)
intervals = ContentAligner.generate_page_intervals(timestamps, duration)
# 使用改进的匹配算法
page_texts = ContentAligner.find_best_match(segments, intervals)
# 生成最终的对齐数据
aligned_data = []
for idx in range(len(intervals)):
text = " ".join([seg["text"] for seg in page_texts.get(idx, [])])
aligned_data.append({
"page": idx,
"start_time": intervals[idx][0],
"end_time": intervals[idx][1],
"text": text
})
return aligned_data
# ---------------------- 摘要生成模块 ----------------------
class SummaryGenerator:
@staticmethod
def optimize_text(text: str) -> str:
"""文本浓缩优化"""
sentences = re.split(r'[。!?]', text)
filtered = []
seen = set()
for sent in sentences:
sent = sent.strip()
if (len(sent) >= 10
and not any(word in sent for word in TRANSITION_WORDS)
and sent not in seen):
filtered.append(sent)
seen.add(sent)
return ''.join(filtered) + '' if filtered else ""
@staticmethod
def generate_html(aligned_data: list, keyframes: list, output_dir: str):
"""生成HTML报告"""
pages_data = []
temp_img_dir = os.path.join(output_dir, "_temp_images")
os.makedirs(temp_img_dir, exist_ok=True)
try:
for idx, frame in enumerate(keyframes):
img_path = os.path.join(temp_img_dir, f"page_{idx}.jpg")
frame.save(img_path)
with open(img_path, "rb") as f:
img_data = base64.b64encode(f.read()).decode("utf-8")
pages_data.append({
"num": idx + 1,
"time": f"{aligned_data[idx]['start_time']:.1f}s - {aligned_data[idx]['end_time']:.1f}s",
"image": f"data:image/jpeg;base64,{img_data}",
"text": SummaryGenerator.optimize_text(aligned_data[idx]["text"])
})
env = Environment()
template = env.from_string("""
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>PPT视频摘要报告</title>
<style>
.page { margin: 20px; padding: 15px; border: 1px solid #eee; }
img { max-width: 800px; height: auto; }
.timestamp { color: #666; font-size: 0.9em; }
.content { margin-top: 10px; }
</style>
</head>
<body>
<h1>PPT视频结构化摘要</h1>
{% for page in pages %}
<div class="page">
<h2>页面 {{ page.num }}</h2>
<div class="timestamp">{{ page.time }}</div>
<img src="{{ page.image }}" alt="页面截图">
<div class="content">{{ page.text }}</div>
</div>
{% endfor %}
</body>
</html>
""")
output_path = os.path.join(output_dir, "summary.html")
with open(output_path, "w", encoding="utf-8") as f:
f.write(template.render(pages=pages_data))
print(f"[输出] HTML报告已生成: {output_path}")
finally:
for f in os.listdir(temp_img_dir):
os.remove(os.path.join(temp_img_dir, f))
os.rmdir(temp_img_dir)
@staticmethod
def generate_pdf(aligned_data: list, keyframes: list, output_dir: str):
"""生成PDF报告优化版"""
temp_html = os.path.join(output_dir, "_temp_pdf.html")
temp_img_dir = os.path.join(output_dir, "_temp_pdf_images")
os.makedirs(temp_img_dir, exist_ok=True)
try:
# 使用绝对路径
abs_temp_img_dir = os.path.abspath(temp_img_dir)
html_content = """
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<style>
@page {
margin: 20mm;
size: A4;
}
body {
font-family: "Microsoft YaHei", "SimSun", sans-serif;
line-height: 1.6;
color: #333;
}
.page {
page-break-inside: avoid;
margin-bottom: 30px;
padding: 20px;
border: 1px solid #eee;
border-radius: 5px;
}
.page-number {
text-align: center;
font-size: 24pt;
font-weight: bold;
margin-bottom: 20px;
color: #2c3e50;
}
.timestamp {
color: #666;
font-size: 12pt;
margin-bottom: 15px;
}
.image-container {
text-align: center;
margin: 20px 0;
}
img {
max-width: 90% !important;
height: auto;
display: block;
margin: 0 auto;
box-shadow: 0 2px 5px rgba(0,0,0,0.1);
}
.content {
font-size: 14pt;
line-height: 1.8;
margin-top: 20px;
padding: 15px;
background: #f9f9f9;
border-radius: 5px;
}
.professional-term {
color: #2980b9;
font-weight: bold;
}
</style>
</head>
<body>
<h1 style="text-align: center; color: #2c3e50; margin-bottom: 40px;">PPT视频结构化摘要</h1>
{% for page in pages %}
<div class="page">
<div class="page-number"> {{ page.num }} </div>
<div class="timestamp">时间区间{{ page.time }}</div>
<div class="image-container">
<img src="{{ page.image_path }}" alt="页面截图">
</div>
<div class="content">{{ page.text }}</div>
</div>
{% endfor %}
</body>
</html>
"""
pages_data = []
for idx, frame in enumerate(keyframes):
img_filename = f"page_{idx}.jpg"
img_path = os.path.join(abs_temp_img_dir, img_filename)
frame.save(img_path)
pages_data.append({
"num": idx + 1,
"time": f"{aligned_data[idx]['start_time']:.1f}s - {aligned_data[idx]['end_time']:.1f}s",
"image_path": img_path,
"text": SummaryGenerator.optimize_text(aligned_data[idx]["text"])
})
env = Environment()
template = env.from_string(html_content)
with open(temp_html, "w", encoding="utf-8") as f:
f.write(template.render(pages=pages_data))
# PDF生成选项
options = {
"enable-local-file-access": "",
"encoding": "UTF-8",
"margin-top": "20mm",
"margin-bottom": "20mm",
"margin-left": "20mm",
"margin-right": "20mm",
"no-stop-slow-scripts": "",
"quiet": "",
"dpi": "300",
"image-quality": "100",
"enable-smart-shrinking": "",
"print-media-type": ""
}
config = pdfkit.configuration(wkhtmltopdf=WKHTMLTOPDF_PATH)
pdf_path = os.path.join(output_dir, "summary.pdf")
pdfkit.from_file(
temp_html,
pdf_path,
configuration=config,
options=options
)
print(f"[输出] PDF报告已生成: {pdf_path}")
finally:
# 清理临时文件
if os.path.exists(temp_html):
os.remove(temp_html)
if os.path.exists(temp_img_dir):
for f in os.listdir(temp_img_dir):
os.remove(os.path.join(temp_img_dir, f))
os.rmdir(temp_img_dir)
@classmethod
def generate_all(cls, aligned_data: list, keyframes: list, output_dir: str):
"""生成所有格式报告"""
cls.generate_html(aligned_data, keyframes, output_dir)
cls.generate_pdf(aligned_data, keyframes, output_dir)
# ---------------------- 主流程控制 ----------------------
def main_process():
# 环境检查
processor = VideoProcessor()
if not processor.check_ffmpeg():
return
if not os.path.exists(VIDEO_PATH):
print(f"[错误] 视频文件不存在: {VIDEO_PATH}")
return
# 关键帧提取
keyframes, timestamps = processor.extract_keyframes(VIDEO_PATH)
if not keyframes:
print("[错误] 未提取到关键帧")
return
# 内容对齐
aligned_data = ContentAligner.align_content(VIDEO_PATH, timestamps)
if not aligned_data:
print("[警告] 未识别到有效语音内容")
# 生成摘要
os.makedirs(OUTPUT_DIR, exist_ok=True)
SummaryGenerator.generate_all(aligned_data, keyframes, OUTPUT_DIR)
if __name__ == "__main__":
main_process()

Binary file not shown.

View File

@ -1,548 +0,0 @@
import os
import re
import base64
import warnings
import imageio
import whisper
import numpy as np
import pdfkit
from PIL import Image
from skimage.metrics import structural_similarity as ssim
from collections import defaultdict
import subprocess
from jinja2 import Environment
import cv2
from scipy.signal import find_peaks
from skimage.feature import hog
from skimage.color import rgb2gray
# ======================== 全局配置 ========================
warnings.filterwarnings("ignore", message="FP16 is not supported on CPU; using FP32 instead")
VIDEO_PATH = "D:/python项目文件/1/input3.mp4" # 输入视频路径
MODEL_DIR = "D:/whisper_models" # Whisper模型目录
FFMPEG_BIN = r"D:\Program Files\ffmpeg\bin" # FFmpeg安装路径
WKHTMLTOPDF_PATH = r"D:\wkhtmltopdf\bin\wkhtmltopdf.exe" # wkhtmltopdf路径
SSIM_THRESHOLD = 0.85 # 关键帧去重阈值
FRAME_INTERVAL = 2 # 抽帧间隔(秒)
OUTPUT_DIR = "D:\桌面文件\python\output" # 输出目录
TRANSITION_WORDS = ["接下来", "下一页", "如图"] # 过渡词过滤列
HOG_THRESHOLD = 0.7 # HOG特征相似度阈值
COLOR_THRESHOLD = 0.8 # 颜色直方图相似度阈值
WHISPER_MODEL = "base" # Whisper模型大小
PROFESSIONAL_TERMS = {
"人工智能": "AI",
"机器学习": "ML",
"深度学习": "DL",
"神经网络": "NN",
"卷积神经网络": "CNN",
"循环神经网络": "RNN",
"自然语言处理": "NLP",
"计算机视觉": "CV",
"大数据": "Big Data",
"云计算": "Cloud Computing"
} # 专业术语词典
# ========================================================
# ---------------------- 核心功能模块 ----------------------
class VideoProcessor:
def __init__(self):
os.environ["PATH"] = FFMPEG_BIN + os.pathsep + os.environ["PATH"]
@staticmethod
def check_ffmpeg():
"""验证FFmpeg可用性"""
try:
subprocess.run(["ffmpeg", "-version"], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print("[系统] FFmpeg验证成功")
return True
except Exception as e:
print(f"[错误] FFmpeg验证失败: {str(e)}")
return False
@staticmethod
def calculate_color_histogram(frame):
"""计算颜色直方图特征"""
hist = cv2.calcHist([frame], [0, 1, 2], None, [8, 8, 8], [0, 256, 0, 256, 0, 256])
cv2.normalize(hist, hist)
return hist.flatten()
@staticmethod
def calculate_hog_features(frame):
"""计算HOG特征"""
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
features = hog(gray, orientations=8, pixels_per_cell=(16, 16),
cells_per_block=(1, 1), visualize=False)
return features
@staticmethod
def is_ppt_transition(frame1, frame2):
"""检测PPT页面切换"""
# 转换为灰度图
gray1 = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY)
gray2 = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY)
# 计算边缘
edges1 = cv2.Canny(gray1, 100, 200)
edges2 = cv2.Canny(gray2, 100, 200)
# 计算边缘差异
diff = cv2.absdiff(edges1, edges2)
return np.mean(diff) > 50 # 阈值可调整
@staticmethod
def extract_keyframes(video_path: str) -> tuple:
"""提取去重关键帧及其时间戳(多特征融合)"""
try:
reader = imageio.get_reader(video_path)
fps = reader.get_meta_data()["fps"]
total_frames = reader.count_frames()
print(f"[信息] 视频总帧数: {total_frames}")
keyframes = []
timestamps = []
prev_frame = None
frame_count = 0
last_progress = 0
for idx, frame in enumerate(reader):
# 显示进度
progress = int((idx / total_frames) * 100)
if progress != last_progress and progress % 5 == 0: # 每5%显示一次进度
print(f"[进度] 处理中: {progress}% ({idx}/{total_frames}帧)")
last_progress = progress
curr_time = idx / fps
if curr_time - (timestamps[-1] if timestamps else 0) < FRAME_INTERVAL:
continue
# 多特征相似度计算
if prev_frame is not None:
try:
# 1. SSIM相似度使用简化版本
gray_prev = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
gray_curr = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
ssim_score = ssim(gray_prev, gray_curr, win_size=3)
# 2. 颜色直方图相似度
hist_prev = VideoProcessor.calculate_color_histogram(prev_frame)
hist_curr = VideoProcessor.calculate_color_histogram(frame)
color_sim = cv2.compareHist(hist_prev, hist_curr, cv2.HISTCMP_CORREL)
# 3. HOG特征相似度仅在SSIM和颜色相似度较高时计算
if ssim_score > 0.8 and color_sim > 0.8:
hog_prev = VideoProcessor.calculate_hog_features(prev_frame)
hog_curr = VideoProcessor.calculate_hog_features(frame)
hog_sim = np.dot(hog_prev, hog_curr) / (np.linalg.norm(hog_prev) * np.linalg.norm(hog_curr))
else:
hog_sim = 0 # 如果SSIM和颜色相似度低直接跳过HOG计算
# 4. PPT页面切换检测
is_transition = VideoProcessor.is_ppt_transition(prev_frame, frame)
# 综合判断
if (ssim_score > SSIM_THRESHOLD and
color_sim > COLOR_THRESHOLD and
hog_sim > HOG_THRESHOLD and
not is_transition):
continue
except Exception as e:
print(f"[警告] 特征计算失败: {str(e)}")
continue
keyframes.append(Image.fromarray(frame))
timestamps.append(curr_time)
prev_frame = frame
frame_count += 1
# 每处理100帧强制垃圾回收
if frame_count % 100 == 0:
import gc
gc.collect()
reader.close()
print(f"[图像] 关键帧提取完成,共{len(keyframes)}")
return keyframes, timestamps
except Exception as e:
print(f"[错误] 关键帧提取失败: {str(e)}")
return [], []
@staticmethod
def transcribe_audio(video_path: str, model_name: str = WHISPER_MODEL) -> list:
"""语音识别与时间戳获取(支持中英文混合)"""
try:
# 使用更大的模型提高准确率
model = whisper.load_model(model_name, device="cpu", download_root=MODEL_DIR)
# 配置转写参数
result = model.transcribe(
video_path,
fp16=False,
language="zh",
task="transcribe",
verbose=True,
initial_prompt="这是一段包含中英文的PPT讲解视频可能包含专业术语。"
)
segments = result.get("segments", [])
# 后处理:专业术语替换
for seg in segments:
text = seg["text"]
for cn, en in PROFESSIONAL_TERMS.items():
text = text.replace(cn, f"{cn}({en})")
seg["text"] = text
return segments
except Exception as e:
print(f"[错误] 语音识别失败: {str(e)}")
return []
# ---------------------- 业务逻辑模块 ----------------------
class ContentAligner:
@staticmethod
def generate_page_intervals(timestamps: list, duration: float) -> list:
"""生成页面时间段"""
intervals = []
for i in range(len(timestamps)):
start = timestamps[i]
end = timestamps[i + 1] if i < len(timestamps) - 1 else duration
intervals.append((start, end))
return intervals
@staticmethod
def calculate_text_similarity(text1: str, text2: str) -> float:
"""计算文本相似度"""
# 使用简单的词重叠度计算
words1 = set(re.findall(r'\w+', text1.lower()))
words2 = set(re.findall(r'\w+', text2.lower()))
if not words1 or not words2:
return 0.0
intersection = words1.intersection(words2)
union = words1.union(words2)
return len(intersection) / len(union)
@staticmethod
def find_best_match(segments: list, intervals: list) -> dict:
"""为每个语音片段找到最佳匹配的页面"""
page_texts = defaultdict(list)
unmatched_segments = []
for seg in segments:
seg_start = seg["start"]
best_match = None
best_score = 0.0
# 1. 首先尝试时间戳匹配
for page_idx, (start, end) in enumerate(intervals):
if start <= seg_start < end:
best_match = page_idx
break
# 2. 如果时间戳匹配失败,尝试文本相似度匹配
if best_match is None:
for page_idx, (start, end) in enumerate(intervals):
# 获取该页面的所有文本
page_text = " ".join([s["text"] for s in segments if start <= s["start"] < end])
similarity = ContentAligner.calculate_text_similarity(seg["text"], page_text)
if similarity > best_score:
best_score = similarity
best_match = page_idx
# 3. 如果找到匹配,添加到对应页面
if best_match is not None:
page_texts[best_match].append(seg)
else:
unmatched_segments.append(seg)
# 4. 处理未匹配的片段
if unmatched_segments:
print(f"[警告] 发现{len(unmatched_segments)}个未匹配的语音片段")
# 将未匹配片段添加到最近的页面
for seg in unmatched_segments:
closest_page = min(range(len(intervals)),
key=lambda i: abs(seg["start"] - (intervals[i][0] + intervals[i][1]) / 2))
page_texts[closest_page].append(seg)
return page_texts
@staticmethod
def align_content(video_path: str, timestamps: list) -> list:
"""语音-画面对齐主逻辑(改进版)"""
try:
reader = imageio.get_reader(video_path)
duration = reader.get_meta_data()["duration"]
reader.close()
except:
duration = timestamps[-1] + FRAME_INTERVAL
segments = VideoProcessor.transcribe_audio(video_path)
intervals = ContentAligner.generate_page_intervals(timestamps, duration)
# 使用改进的匹配算法
page_texts = ContentAligner.find_best_match(segments, intervals)
# 生成最终的对齐数据
aligned_data = []
for idx in range(len(intervals)):
text = " ".join([seg["text"] for seg in page_texts.get(idx, [])])
aligned_data.append({
"page": idx,
"start_time": intervals[idx][0],
"end_time": intervals[idx][1],
"text": text
})
return aligned_data
# ---------------------- 摘要生成模块 ----------------------
class SummaryGenerator:
@staticmethod
def optimize_text(text: str) -> str:
"""文本浓缩优化"""
sentences = re.split(r'[。!?]', text)
filtered = []
seen = set()
for sent in sentences:
sent = sent.strip()
if (len(sent) >= 10
and not any(word in sent for word in TRANSITION_WORDS)
and sent not in seen):
filtered.append(sent)
seen.add(sent)
return ''.join(filtered) + '' if filtered else ""
@staticmethod
def generate_html(aligned_data: list, keyframes: list, output_dir: str):
"""生成HTML报告"""
pages_data = []
temp_img_dir = os.path.join(output_dir, "_temp_images")
os.makedirs(temp_img_dir, exist_ok=True)
try:
for idx, frame in enumerate(keyframes):
img_path = os.path.join(temp_img_dir, f"page_{idx}.jpg")
frame.save(img_path)
with open(img_path, "rb") as f:
img_data = base64.b64encode(f.read()).decode("utf-8")
pages_data.append({
"num": idx + 1,
"time": f"{aligned_data[idx]['start_time']:.1f}s - {aligned_data[idx]['end_time']:.1f}s",
"image": f"data:image/jpeg;base64,{img_data}",
"text": SummaryGenerator.optimize_text(aligned_data[idx]["text"])
})
env = Environment()
template = env.from_string("""
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>PPT视频摘要报告</title>
<style>
.page { margin: 20px; padding: 15px; border: 1px solid #eee; }
img { max-width: 800px; height: auto; }
.timestamp { color: #666; font-size: 0.9em; }
.content { margin-top: 10px; }
</style>
</head>
<body>
<h1>PPT视频结构化摘要</h1>
{% for page in pages %}
<div class="page">
<h2>页面 {{ page.num }}</h2>
<div class="timestamp">{{ page.time }}</div>
<img src="{{ page.image }}" alt="页面截图">
<div class="content">{{ page.text }}</div>
</div>
{% endfor %}
</body>
</html>
""")
output_path = os.path.join(output_dir, "summary.html")
with open(output_path, "w", encoding="utf-8") as f:
f.write(template.render(pages=pages_data))
print(f"[输出] HTML报告已生成: {output_path}")
finally:
for f in os.listdir(temp_img_dir):
os.remove(os.path.join(temp_img_dir, f))
os.rmdir(temp_img_dir)
@staticmethod
def generate_pdf(aligned_data: list, keyframes: list, output_dir: str):
"""生成PDF报告优化版"""
temp_html = os.path.join(output_dir, "_temp_pdf.html")
temp_img_dir = os.path.join(output_dir, "_temp_pdf_images")
os.makedirs(temp_img_dir, exist_ok=True)
try:
# 使用绝对路径
abs_temp_img_dir = os.path.abspath(temp_img_dir)
html_content = """
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<style>
@page {
margin: 20mm;
size: A4;
}
body {
font-family: "Microsoft YaHei", "SimSun", sans-serif;
line-height: 1.6;
color: #333;
}
.page {
page-break-inside: avoid;
margin-bottom: 30px;
padding: 20px;
border: 1px solid #eee;
border-radius: 5px;
}
.page-number {
text-align: center;
font-size: 24pt;
font-weight: bold;
margin-bottom: 20px;
color: #2c3e50;
}
.timestamp {
color: #666;
font-size: 12pt;
margin-bottom: 15px;
}
.image-container {
text-align: center;
margin: 20px 0;
}
img {
max-width: 90% !important;
height: auto;
display: block;
margin: 0 auto;
box-shadow: 0 2px 5px rgba(0,0,0,0.1);
}
.content {
font-size: 14pt;
line-height: 1.8;
margin-top: 20px;
padding: 15px;
background: #f9f9f9;
border-radius: 5px;
}
.professional-term {
color: #2980b9;
font-weight: bold;
}
</style>
</head>
<body>
<h1 style="text-align: center; color: #2c3e50; margin-bottom: 40px;">PPT视频结构化摘要</h1>
{% for page in pages %}
<div class="page">
<div class="page-number"> {{ page.num }} </div>
<div class="timestamp">时间区间{{ page.time }}</div>
<div class="image-container">
<img src="{{ page.image_path }}" alt="页面截图">
</div>
<div class="content">{{ page.text }}</div>
</div>
{% endfor %}
</body>
</html>
"""
pages_data = []
for idx, frame in enumerate(keyframes):
img_filename = f"page_{idx}.jpg"
img_path = os.path.join(abs_temp_img_dir, img_filename)
frame.save(img_path)
pages_data.append({
"num": idx + 1,
"time": f"{aligned_data[idx]['start_time']:.1f}s - {aligned_data[idx]['end_time']:.1f}s",
"image_path": img_path,
"text": SummaryGenerator.optimize_text(aligned_data[idx]["text"])
})
env = Environment()
template = env.from_string(html_content)
with open(temp_html, "w", encoding="utf-8") as f:
f.write(template.render(pages=pages_data))
# PDF生成选项
options = {
"enable-local-file-access": "",
"encoding": "UTF-8",
"margin-top": "20mm",
"margin-bottom": "20mm",
"margin-left": "20mm",
"margin-right": "20mm",
"no-stop-slow-scripts": "",
"quiet": "",
"dpi": "300",
"image-quality": "100",
"enable-smart-shrinking": "",
"print-media-type": ""
}
config = pdfkit.configuration(wkhtmltopdf=WKHTMLTOPDF_PATH)
pdf_path = os.path.join(output_dir, "summary.pdf")
pdfkit.from_file(
temp_html,
pdf_path,
configuration=config,
options=options
)
print(f"[输出] PDF报告已生成: {pdf_path}")
finally:
# 清理临时文件
if os.path.exists(temp_html):
os.remove(temp_html)
if os.path.exists(temp_img_dir):
for f in os.listdir(temp_img_dir):
os.remove(os.path.join(temp_img_dir, f))
os.rmdir(temp_img_dir)
@classmethod
def generate_all(cls, aligned_data: list, keyframes: list, output_dir: str):
"""生成所有格式报告"""
cls.generate_html(aligned_data, keyframes, output_dir)
cls.generate_pdf(aligned_data, keyframes, output_dir)
# ---------------------- 主流程控制 ----------------------
def main_process():
# 环境检查
processor = VideoProcessor()
if not processor.check_ffmpeg():
return
if not os.path.exists(VIDEO_PATH):
print(f"[错误] 视频文件不存在: {VIDEO_PATH}")
return
# 关键帧提取
keyframes, timestamps = processor.extract_keyframes(VIDEO_PATH)
if not keyframes:
print("[错误] 未提取到关键帧")
return
# 内容对齐
aligned_data = ContentAligner.align_content(VIDEO_PATH, timestamps)
if not aligned_data:
print("[警告] 未识别到有效语音内容")
# 生成摘要
os.makedirs(OUTPUT_DIR, exist_ok=True)
SummaryGenerator.generate_all(aligned_data, keyframes, OUTPUT_DIR)
if __name__ == "__main__":
main_process()

Binary file not shown.

View File

@ -1,575 +0,0 @@
import os
import re
import base64
import warnings
import imageio
import whisper
import numpy as np
import pdfkit
from PIL import Image
from skimage.metrics import structural_similarity as ssim
from collections import defaultdict
import subprocess
from jinja2 import Environment
import cv2
from scipy.signal import find_peaks
from skimage.feature import hog
from skimage.color import rgb2gray
# ======================== 全局配置 ========================
warnings.filterwarnings("ignore", message="FP16 is not supported on CPU; using FP32 instead")
VIDEO_PATH = "D:/python项目文件/1/input.mp4" # 输入视频路径
MODEL_DIR = "D:/whisper_models" # Whisper模型目录
FFMPEG_BIN = r"D:\Program Files\ffmpeg\bin" # FFmpeg安装路径
WKHTMLTOPDF_PATH = r"D:\wkhtmltopdf\bin\wkhtmltopdf.exe" # wkhtmltopdf路径
SSIM_THRESHOLD = 0.85 # 关键帧去重阈值
FRAME_INTERVAL = 2 # 抽帧间隔(秒)
OUTPUT_DIR = "D:\桌面文件\python\output" # 输出目录
TRANSITION_WORDS = ["接下来", "下一页", "如图"] # 过渡词过滤列
HOG_THRESHOLD = 0.7 # HOG特征相似度阈值
COLOR_THRESHOLD = 0.8 # 颜色直方图相似度阈值
WHISPER_MODEL = "base" # Whisper模型大小
PROFESSIONAL_TERMS = {
"人工智能": "AI",
"机器学习": "ML",
"深度学习": "DL",
"神经网络": "NN",
"卷积神经网络": "CNN",
"循环神经网络": "RNN",
"自然语言处理": "NLP",
"计算机视觉": "CV",
"大数据": "Big Data",
"云计算": "Cloud Computing"
} # 专业术语词典
# ========================================================
# ---------------------- 核心功能模块 ----------------------
class VideoProcessor:
def __init__(self):
os.environ["PATH"] = FFMPEG_BIN + os.pathsep + os.environ["PATH"]
@staticmethod
def check_ffmpeg():
"""验证FFmpeg可用性"""
try:
subprocess.run(["ffmpeg", "-version"], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print("[系统] FFmpeg验证成功")
return True
except Exception as e:
print(f"[错误] FFmpeg验证失败: {str(e)}")
return False
@staticmethod
def calculate_color_histogram(frame):
"""计算颜色直方图特征"""
hist = cv2.calcHist([frame], [0, 1, 2], None, [8, 8, 8], [0, 256, 0, 256, 0, 256])
cv2.normalize(hist, hist)
return hist.flatten()
@staticmethod
def calculate_hog_features(frame):
"""计算HOG特征"""
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
features = hog(gray, orientations=8, pixels_per_cell=(16, 16),
cells_per_block=(1, 1), visualize=False)
return features
@staticmethod
def is_ppt_transition(frame1, frame2):
"""检测PPT页面切换"""
# 转换为灰度图
gray1 = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY)
gray2 = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY)
# 计算边缘
edges1 = cv2.Canny(gray1, 100, 200)
edges2 = cv2.Canny(gray2, 100, 200)
# 计算边缘差异
diff = cv2.absdiff(edges1, edges2)
return np.mean(diff) > 50 # 阈值可调整
@staticmethod
def extract_keyframes(video_path: str) -> tuple:
"""提取去重关键帧及其时间戳(多特征融合)"""
try:
reader = imageio.get_reader(video_path)
fps = reader.get_meta_data()["fps"]
total_frames = reader.count_frames()
print(f"[信息] 视频总帧数: {total_frames}")
keyframes = []
timestamps = []
prev_frame = None
frame_count = 0
last_progress = 0
for idx, frame in enumerate(reader):
# 显示进度
progress = int((idx / total_frames) * 100)
if progress != last_progress and progress % 5 == 0: # 每5%显示一次进度
print(f"[进度] 处理中: {progress}% ({idx}/{total_frames}帧)")
last_progress = progress
curr_time = idx / fps
if curr_time - (timestamps[-1] if timestamps else 0) < FRAME_INTERVAL:
continue
# 检查是否为无信息帧(纯黑屏或纯白屏)
if VideoProcessor.is_blank_frame(frame):
continue
# 多特征相似度计算
if prev_frame is not None:
try:
# 1. SSIM相似度使用简化版本
gray_prev = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
gray_curr = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
ssim_score = ssim(gray_prev, gray_curr, win_size=3)
# 2. 颜色直方图相似度
hist_prev = VideoProcessor.calculate_color_histogram(prev_frame)
hist_curr = VideoProcessor.calculate_color_histogram(frame)
color_sim = cv2.compareHist(hist_prev, hist_curr, cv2.HISTCMP_CORREL)
# 3. HOG特征相似度仅在SSIM和颜色相似度较高时计算
if ssim_score > 0.8 and color_sim > 0.8:
hog_prev = VideoProcessor.calculate_hog_features(prev_frame)
hog_curr = VideoProcessor.calculate_hog_features(frame)
hog_sim = np.dot(hog_prev, hog_curr) / (np.linalg.norm(hog_prev) * np.linalg.norm(hog_curr))
else:
hog_sim = 0 # 如果SSIM和颜色相似度低直接跳过HOG计算
# 4. PPT页面切换检测
is_transition = VideoProcessor.is_ppt_transition(prev_frame, frame)
# 综合判断
if (ssim_score > SSIM_THRESHOLD and
color_sim > COLOR_THRESHOLD and
hog_sim > HOG_THRESHOLD and
not is_transition):
continue
except Exception as e:
print(f"[警告] 特征计算失败: {str(e)}")
continue
keyframes.append(Image.fromarray(frame))
timestamps.append(curr_time)
prev_frame = frame
frame_count += 1
# 每处理100帧强制垃圾回收
if frame_count % 100 == 0:
import gc
gc.collect()
reader.close()
print(f"[图像] 关键帧提取完成,共{len(keyframes)}")
return keyframes, timestamps
except Exception as e:
print(f"[错误] 关键帧提取失败: {str(e)}")
return [], []
@staticmethod
def is_blank_frame(frame, threshold=30):
"""检测是否为无信息帧(纯黑屏或纯白屏)"""
try:
# 转换为灰度图
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# 计算图像统计特征
mean = np.mean(gray)
std_dev = np.std(gray)
# 检查是否为纯黑或纯白
is_black = mean < 10 and std_dev < 5
is_white = mean > 245 and std_dev < 5
# 检查是否有足够的细节
has_detail = std_dev > threshold
return is_black or is_white or not has_detail
except Exception as e:
print(f"[警告] 检查无信息帧时出错: {str(e)}")
return True
@staticmethod
def transcribe_audio(video_path: str, model_name: str = WHISPER_MODEL) -> list:
"""语音识别与时间戳获取(支持中英文混合)"""
try:
# 使用更大的模型提高准确率
model = whisper.load_model(model_name, device="cpu", download_root=MODEL_DIR)
# 配置转写参数
result = model.transcribe(
video_path,
fp16=False,
language="zh",
task="transcribe",
verbose=True,
initial_prompt="这是一段包含中英文的PPT讲解视频可能包含专业术语。"
)
segments = result.get("segments", [])
# 后处理:专业术语替换
for seg in segments:
text = seg["text"]
for cn, en in PROFESSIONAL_TERMS.items():
text = text.replace(cn, f"{cn}({en})")
seg["text"] = text
return segments
except Exception as e:
print(f"[错误] 语音识别失败: {str(e)}")
return []
# ---------------------- 业务逻辑模块 ----------------------
class ContentAligner:
@staticmethod
def generate_page_intervals(timestamps: list, duration: float) -> list:
"""生成页面时间段"""
intervals = []
for i in range(len(timestamps)):
start = timestamps[i]
end = timestamps[i + 1] if i < len(timestamps) - 1 else duration
intervals.append((start, end))
return intervals
@staticmethod
def calculate_text_similarity(text1: str, text2: str) -> float:
"""计算文本相似度"""
# 使用简单的词重叠度计算
words1 = set(re.findall(r'\w+', text1.lower()))
words2 = set(re.findall(r'\w+', text2.lower()))
if not words1 or not words2:
return 0.0
intersection = words1.intersection(words2)
union = words1.union(words2)
return len(intersection) / len(union)
@staticmethod
def find_best_match(segments: list, intervals: list) -> dict:
"""为每个语音片段找到最佳匹配的页面"""
page_texts = defaultdict(list)
unmatched_segments = []
for seg in segments:
seg_start = seg["start"]
best_match = None
best_score = 0.0
# 1. 首先尝试时间戳匹配
for page_idx, (start, end) in enumerate(intervals):
if start <= seg_start < end:
best_match = page_idx
break
# 2. 如果时间戳匹配失败,尝试文本相似度匹配
if best_match is None:
for page_idx, (start, end) in enumerate(intervals):
# 获取该页面的所有文本
page_text = " ".join([s["text"] for s in segments if start <= s["start"] < end])
similarity = ContentAligner.calculate_text_similarity(seg["text"], page_text)
if similarity > best_score:
best_score = similarity
best_match = page_idx
# 3. 如果找到匹配,添加到对应页面
if best_match is not None:
page_texts[best_match].append(seg)
else:
unmatched_segments.append(seg)
# 4. 处理未匹配的片段
if unmatched_segments:
print(f"[警告] 发现{len(unmatched_segments)}个未匹配的语音片段")
# 将未匹配片段添加到最近的页面
for seg in unmatched_segments:
closest_page = min(range(len(intervals)),
key=lambda i: abs(seg["start"] - (intervals[i][0] + intervals[i][1]) / 2))
page_texts[closest_page].append(seg)
return page_texts
@staticmethod
def align_content(video_path: str, timestamps: list) -> list:
"""语音-画面对齐主逻辑(改进版)"""
try:
reader = imageio.get_reader(video_path)
duration = reader.get_meta_data()["duration"]
reader.close()
except:
duration = timestamps[-1] + FRAME_INTERVAL
segments = VideoProcessor.transcribe_audio(video_path)
intervals = ContentAligner.generate_page_intervals(timestamps, duration)
# 使用改进的匹配算法
page_texts = ContentAligner.find_best_match(segments, intervals)
# 生成最终的对齐数据
aligned_data = []
for idx in range(len(intervals)):
text = " ".join([seg["text"] for seg in page_texts.get(idx, [])])
aligned_data.append({
"page": idx,
"start_time": intervals[idx][0],
"end_time": intervals[idx][1],
"text": text
})
return aligned_data
# ---------------------- 摘要生成模块 ----------------------
class SummaryGenerator:
@staticmethod
def optimize_text(text: str) -> str:
"""文本浓缩优化"""
sentences = re.split(r'[。!?]', text)
filtered = []
seen = set()
for sent in sentences:
sent = sent.strip()
if (len(sent) >= 10
and not any(word in sent for word in TRANSITION_WORDS)
and sent not in seen):
filtered.append(sent)
seen.add(sent)
return ''.join(filtered) + '' if filtered else ""
@staticmethod
def generate_html(aligned_data: list, keyframes: list, output_dir: str):
"""生成HTML报告"""
pages_data = []
temp_img_dir = os.path.join(output_dir, "_temp_images")
os.makedirs(temp_img_dir, exist_ok=True)
try:
for idx, frame in enumerate(keyframes):
img_path = os.path.join(temp_img_dir, f"page_{idx}.jpg")
frame.save(img_path)
with open(img_path, "rb") as f:
img_data = base64.b64encode(f.read()).decode("utf-8")
pages_data.append({
"num": idx + 1,
"time": f"{aligned_data[idx]['start_time']:.1f}s - {aligned_data[idx]['end_time']:.1f}s",
"image": f"data:image/jpeg;base64,{img_data}",
"text": SummaryGenerator.optimize_text(aligned_data[idx]["text"])
})
env = Environment()
template = env.from_string("""
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>PPT视频摘要报告</title>
<style>
.page { margin: 20px; padding: 15px; border: 1px solid #eee; }
img { max-width: 800px; height: auto; }
.timestamp { color: #666; font-size: 0.9em; }
.content { margin-top: 10px; }
</style>
</head>
<body>
<h1>PPT视频结构化摘要</h1>
{% for page in pages %}
<div class="page">
<h2>页面 {{ page.num }}</h2>
<div class="timestamp">{{ page.time }}</div>
<img src="{{ page.image }}" alt="页面截图">
<div class="content">{{ page.text }}</div>
</div>
{% endfor %}
</body>
</html>
""")
output_path = os.path.join(output_dir, "summary.html")
with open(output_path, "w", encoding="utf-8") as f:
f.write(template.render(pages=pages_data))
print(f"[输出] HTML报告已生成: {output_path}")
finally:
for f in os.listdir(temp_img_dir):
os.remove(os.path.join(temp_img_dir, f))
os.rmdir(temp_img_dir)
@staticmethod
def generate_pdf(aligned_data: list, keyframes: list, output_dir: str):
"""生成PDF报告优化版"""
temp_html = os.path.join(output_dir, "_temp_pdf.html")
temp_img_dir = os.path.join(output_dir, "_temp_pdf_images")
os.makedirs(temp_img_dir, exist_ok=True)
try:
# 使用绝对路径
abs_temp_img_dir = os.path.abspath(temp_img_dir)
html_content = """
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<style>
@page {
margin: 20mm;
size: A4;
}
body {
font-family: "Microsoft YaHei", "SimSun", sans-serif;
line-height: 1.6;
color: #333;
}
.page {
page-break-inside: avoid;
margin-bottom: 30px;
padding: 20px;
border: 1px solid #eee;
border-radius: 5px;
}
.page-number {
text-align: center;
font-size: 24pt;
font-weight: bold;
margin-bottom: 20px;
color: #2c3e50;
}
.timestamp {
color: #666;
font-size: 12pt;
margin-bottom: 15px;
}
.image-container {
text-align: center;
margin: 20px 0;
}
img {
max-width: 90% !important;
height: auto;
display: block;
margin: 0 auto;
box-shadow: 0 2px 5px rgba(0,0,0,0.1);
}
.content {
font-size: 14pt;
line-height: 1.8;
margin-top: 20px;
padding: 15px;
background: #f9f9f9;
border-radius: 5px;
}
.professional-term {
color: #2980b9;
font-weight: bold;
}
</style>
</head>
<body>
<h1 style="text-align: center; color: #2c3e50; margin-bottom: 40px;">PPT视频结构化摘要</h1>
{% for page in pages %}
<div class="page">
<div class="page-number"> {{ page.num }} </div>
<div class="timestamp">时间区间{{ page.time }}</div>
<div class="image-container">
<img src="{{ page.image_path }}" alt="页面截图">
</div>
<div class="content">{{ page.text }}</div>
</div>
{% endfor %}
</body>
</html>
"""
pages_data = []
for idx, frame in enumerate(keyframes):
img_filename = f"page_{idx}.jpg"
img_path = os.path.join(abs_temp_img_dir, img_filename)
frame.save(img_path)
pages_data.append({
"num": idx + 1,
"time": f"{aligned_data[idx]['start_time']:.1f}s - {aligned_data[idx]['end_time']:.1f}s",
"image_path": img_path,
"text": SummaryGenerator.optimize_text(aligned_data[idx]["text"])
})
env = Environment()
template = env.from_string(html_content)
with open(temp_html, "w", encoding="utf-8") as f:
f.write(template.render(pages=pages_data))
# PDF生成选项
options = {
"enable-local-file-access": "",
"encoding": "UTF-8",
"margin-top": "20mm",
"margin-bottom": "20mm",
"margin-left": "20mm",
"margin-right": "20mm",
"no-stop-slow-scripts": "",
"quiet": "",
"dpi": "300",
"image-quality": "100",
"enable-smart-shrinking": "",
"print-media-type": ""
}
config = pdfkit.configuration(wkhtmltopdf=WKHTMLTOPDF_PATH)
pdf_path = os.path.join(output_dir, "summary.pdf")
pdfkit.from_file(
temp_html,
pdf_path,
configuration=config,
options=options
)
print(f"[输出] PDF报告已生成: {pdf_path}")
finally:
# 清理临时文件
if os.path.exists(temp_html):
os.remove(temp_html)
if os.path.exists(temp_img_dir):
for f in os.listdir(temp_img_dir):
os.remove(os.path.join(temp_img_dir, f))
os.rmdir(temp_img_dir)
@classmethod
def generate_all(cls, aligned_data: list, keyframes: list, output_dir: str):
"""生成所有格式报告"""
cls.generate_html(aligned_data, keyframes, output_dir)
cls.generate_pdf(aligned_data, keyframes, output_dir)
# ---------------------- 主流程控制 ----------------------
def main_process():
# 环境检查
processor = VideoProcessor()
if not processor.check_ffmpeg():
return
if not os.path.exists(VIDEO_PATH):
print(f"[错误] 视频文件不存在: {VIDEO_PATH}")
return
# 关键帧提取
keyframes, timestamps = processor.extract_keyframes(VIDEO_PATH)
if not keyframes:
print("[错误] 未提取到关键帧")
return
# 内容对齐
aligned_data = ContentAligner.align_content(VIDEO_PATH, timestamps)
if not aligned_data:
print("[警告] 未识别到有效语音内容")
# 生成摘要
os.makedirs(OUTPUT_DIR, exist_ok=True)
SummaryGenerator.generate_all(aligned_data, keyframes, OUTPUT_DIR)
if __name__ == "__main__":
main_process()

Binary file not shown.

File diff suppressed because it is too large Load Diff

View File

@ -1,44 +0,0 @@
# 1. 选择基础镜像 (推荐使用具体的版本号)
FROM python:3.10
# 2. 设置工作目录
WORKDIR /app
# 3. 更新apt包列表并安装系统依赖
# - build-essential: 用于编译一些Python包可能需要的C/C++代码
# - ffmpeg: 被 moviepy 和 imageio-ffmpeg 需要
# - libgl1-mesa-glx, libglib2.0-0: opencv-python 可能需要的运行时库
# - wkhtmltopdf: pdfkit 需要的工具
# --no-install-recommends 减少不必要的包安装
# 最后清理 apt 缓存以减小镜像体积
RUN apt-get update && \
apt-get install -y --no-install-recommends \
build-essential \
ffmpeg \
libgl1-mesa-glx \
libglib2.0-0 \
wkhtmltopdf \
&& rm -rf /var/lib/apt/lists/*
# 4. (推荐) 将你的 requirements.txt 文件复制到镜像中
# 先复制 requirements.txt 并安装依赖,可以利用 Docker 的层缓存机制
# 只有当 requirements.txt 改变时,这一层及之后的层才会重新构建
COPY requirements.txt .
# 5. 安装 Python 依赖
# --no-cache-dir 减少镜像体积
# -r requirements.txt 从文件安装
RUN pip install --no-cache-dir -r requirements.txt
# 6. 复制你的 Flask 应用代码到镜像中
COPY . .
# 7. 声明你的 Flask 应用监听的端口 (默认是 5000)
EXPOSE 5000
# 8. 定义容器启动时运行的命令
# 使用 Gunicorn 或 uWSGI 在生产环境中通常更好但对于开发flask run 也可以
# 确保 Flask 监听 0.0.0.0 以便从容器外部访问
CMD ["flask", "run", "--host=0.0.0.0"]
# 或者如果你的启动文件是 app.py:
# CMD ["python", "app.py"]

View File

@ -1,14 +0,0 @@
numpy>=1.21.0
opencv-python>=4.5.3
Pillow>=8.3.1
imageio>=2.9.0
imageio-ffmpeg>=0.4.5
scikit-image>=0.18.3
scipy>=1.7.1
openai-whisper>=20231117
pdfkit>=1.0.0
Jinja2>=3.0.1
moviepy>=1.0.3
reportlab>=3.6.8
torch>=1.9.0
tqdm>=4.62.3

View File

@ -1,7 +0,0 @@
@echo off
echo 正在启动视频批量处理系统...
start "" python server.py
timeout /t 3
start http://localhost:5000
echo 服务已启动,如果浏览器没有自动打开,请手动访问 http://localhost:5000
pause

Binary file not shown.