271 lines
9.2 KiB
Python
271 lines
9.2 KiB
Python
import gradio as gr
|
||
import subprocess
|
||
import os
|
||
import re
|
||
|
||
|
||
def convert_to_wav(audio_file):
|
||
# 使用 ffmpeg 将音频文件转换为 wav 格式
|
||
output_wav_file = "output.wav"
|
||
print(f"开始转换音频文件 {audio_file} 为 WAV 格式...")
|
||
subprocess.run([
|
||
"ffmpeg", "-y", "-i", audio_file,
|
||
"-ac", "1",
|
||
"-ar", "16000",
|
||
output_wav_file
|
||
])
|
||
print(f"音频文件 {audio_file} 已转换为 WAV 格式,输出文件为 {output_wav_file}")
|
||
return output_wav_file
|
||
|
||
|
||
def transcribe_audio(wav_file, original_filename, offset_time, duration_time):
|
||
# 调用 whisper 命令行程序进行转写
|
||
whisper_cmd = [
|
||
"/home/tmfc/apps/whisper/main",
|
||
"--language", "Chinese",
|
||
"-otxt",
|
||
"-ot", str(offset_time * 1000),
|
||
"-d", str(duration_time * 1000),
|
||
"-of", "/home/tmfc/apps/whisper/" + original_filename,
|
||
"-m", "/home/tmfc/apps/whisper/models/ggml-large-v3-q5_0.bin",
|
||
wav_file
|
||
]
|
||
print(whisper_cmd)
|
||
print(f"开始转写音频文件 {wav_file}...")
|
||
result = subprocess.run(whisper_cmd, capture_output=True, text=True)
|
||
print(result)
|
||
# 将转写结果保存为 txt 文件
|
||
# txt_file = "transcription.txt"
|
||
# with open(txt_file, "w") as f:
|
||
# f.write(result.stdout)
|
||
|
||
txt_file = original_filename + ".txt"
|
||
print(f"音频文件 {wav_file} 转写完成,结果已保存为 {txt_file}")
|
||
|
||
return txt_file
|
||
|
||
|
||
def process_audio(audio_file, offset_time, duration_time):
|
||
print("开始处理音频文件...")
|
||
# 获取上传的文件名
|
||
original_filename = os.path.basename(audio_file)
|
||
|
||
# 转换音频文件为 wav 格式
|
||
wav_file = convert_to_wav(audio_file)
|
||
|
||
# 转写音频文件
|
||
txt_file = transcribe_audio(wav_file, original_filename, offset_time, duration_time)
|
||
|
||
print("音频文件处理完成")
|
||
return txt_file
|
||
|
||
|
||
def direct_transcribe(audio_file, offset_time, duration_time):
|
||
print("开始直接转写音频文件...")
|
||
|
||
# 转写音频文件
|
||
txt_file = transcribe_audio("output.wav", "output.wav", offset_time, duration_time)
|
||
|
||
print("音频文件直接转写完成")
|
||
return txt_file
|
||
|
||
|
||
batch_directory = '/mnt/d/share/audio/'
|
||
|
||
|
||
def list_files():
|
||
# 获取目录下的所有文件
|
||
files = os.listdir(batch_directory)
|
||
# 过滤掉目录,只保留文件
|
||
files = [f for f in files if
|
||
os.path.isfile(os.path.join(batch_directory, f)) and f.lower().endswith(('.mp3', '.m4a'))]
|
||
return files
|
||
|
||
|
||
log_content = ""
|
||
|
||
|
||
def batch_transcribe():
|
||
global log_content
|
||
files = list_files()
|
||
result_file = []
|
||
for file in files:
|
||
# 转换音频文件为 wav 格式
|
||
log_entry = "转换" + file + "为 wav\n"
|
||
log_content += log_entry
|
||
wav_file = convert_to_wav(batch_directory + file)
|
||
log_entry = "转换wav成功,开始转写\n"
|
||
log_content += log_entry
|
||
# 转写音频文件
|
||
txt_file = transcribe_audio(wav_file, file, 10, 0)
|
||
log_entry = "转写 " + file + "完成\n"
|
||
log_content += log_entry
|
||
result_file.append(txt_file)
|
||
return result_file
|
||
|
||
|
||
def display_files():
|
||
files = list_files()
|
||
return "\n".join(files)
|
||
|
||
|
||
def get_log():
|
||
global log_content
|
||
return log_content
|
||
|
||
|
||
def update_log_output():
|
||
return gr.update(value=get_log())
|
||
|
||
|
||
def convert_to_docx(text):
|
||
if text.strip() == "":
|
||
return "输入框不能为空!"
|
||
|
||
# 将输入内容写入 file.md 文件
|
||
with open("file.md", "w") as file:
|
||
file.write(text)
|
||
|
||
# 使用 pandoc 将 file.md 转换为 file.docx
|
||
try:
|
||
subprocess.run(["pandoc", "file.md", "-o", "file.docx"], check=True)
|
||
except subprocess.CalledProcessError as e:
|
||
return f"转换失败: {e}"
|
||
|
||
# 返回 file.docx 文件供用户下载
|
||
return "file.docx"
|
||
|
||
|
||
def remove_headers(curl_command):
|
||
# 使用正则表达式去除 if-none-match 和 range 标头
|
||
curl_command = re.sub(r'(-H\s*\'if-none-match:[^\\]*\'\s*)', '', curl_command)
|
||
curl_command = re.sub(r'(-H\s*\'range:[^\\]*\'\s*)', '', curl_command)
|
||
return curl_command
|
||
|
||
|
||
def do_download_pdf_file(curl_command, pdf_filename):
|
||
# 去除不需要的标头
|
||
curl_command = remove_headers(curl_command)
|
||
|
||
# 使用 subprocess 调用 curl 命令
|
||
try:
|
||
result = subprocess.run(curl_command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||
# 保存文件到临时文件
|
||
if pdf_filename != "":
|
||
temp_file_path = f"./{pdf_filename}.pdf"
|
||
else:
|
||
temp_file_path = "./download.pdf"
|
||
with open(temp_file_path, 'wb') as f:
|
||
f.write(result.stdout)
|
||
return "File downloaded successfully", temp_file_path
|
||
except subprocess.CalledProcessError as e:
|
||
return f"Failed to download file. Error: {e.stderr.decode()}", None
|
||
|
||
|
||
def download_pdf(curl_command, pdf_filename):
|
||
message, file_path = do_download_pdf_file(curl_command, pdf_filename)
|
||
return file_path
|
||
|
||
|
||
def run_ocr(image):
|
||
# 保存上传的图片到指定路径
|
||
image_path = os.path.expanduser("/home/tmfc/apps/got-ocr/img.png")
|
||
image.save(image_path)
|
||
|
||
# 调用 OCR 命令
|
||
command = [
|
||
"sudo", "-u", "tmfc",
|
||
"/home/tmfc/miniconda3/envs/got/bin/python3", "/home/tmfc/apps/got-ocr/GOT/demo/run_ocr_2.0_crop.py",
|
||
"--model-name", "/home/tmfc/apps/got-ocr/models/",
|
||
"--image-file", image_path
|
||
]
|
||
|
||
out_file = "/home/tmfc/apps/got-ocr/img.txt"
|
||
try:
|
||
with open(out_file, 'w') as f:
|
||
result = subprocess.run(command, stdout=f, stderr=subprocess.PIPE, text=True)
|
||
|
||
except subprocess.CalledProcessError as e:
|
||
return f"识别失败: {e}"
|
||
with open(out_file, 'r', encoding='utf-8') as f:
|
||
content = f.read()
|
||
return content
|
||
|
||
|
||
with gr.Blocks() as iface:
|
||
gr.Markdown("# 大模型工具集")
|
||
with gr.Tabs():
|
||
with gr.TabItem("音频转写"):
|
||
with gr.Row():
|
||
audio_input = gr.Audio(type="filepath", label="上传音频文件")
|
||
with gr.Column():
|
||
offset_input = gr.Number(label="偏移时间 (秒)")
|
||
duration_input = gr.Number(label="转写时长 (秒)")
|
||
|
||
with gr.Row():
|
||
process_button = gr.Button("处理并转写")
|
||
direct_transcribe_button = gr.Button("直接转写")
|
||
|
||
output_file = gr.File(label="转写结果")
|
||
|
||
process_button.click(process_audio, inputs=[audio_input, offset_input, duration_input], outputs=output_file)
|
||
direct_transcribe_button.click(direct_transcribe, inputs=[audio_input, offset_input, duration_input],
|
||
outputs=output_file)
|
||
with gr.TabItem("音频批量转写"):
|
||
with gr.Row():
|
||
link_upload = gr.HTML(value='<a href="https://webd.willking.tech" target="_blank">点击上传文件</a>')
|
||
with gr.Row():
|
||
file_list = gr.Textbox(label="文件列表")
|
||
with gr.Column():
|
||
list_file_button = gr.Button("刷新文件")
|
||
batch_process_button = gr.Button("批量处理")
|
||
with gr.Column():
|
||
batch_output_file = gr.File(label="批量转写结果")
|
||
|
||
with gr.Row():
|
||
log_output = gr.Textbox(label="日志信息", lines=10)
|
||
|
||
list_file_button.click(fn=display_files, outputs=file_list)
|
||
|
||
batch_process_button.click(batch_transcribe, outputs=batch_output_file)
|
||
|
||
with gr.Tab("Markdown 转 Word"):
|
||
gr.Markdown("## Markdown 转 Word 转换器")
|
||
with gr.Row():
|
||
text_input = gr.Textbox(lines=10, placeholder="请在此输入 Markdown 内容...")
|
||
with gr.Row():
|
||
convert_button = gr.Button("转换")
|
||
with gr.Row():
|
||
output_file = gr.File(label="下载转换后的文件")
|
||
|
||
convert_button.click(convert_to_docx, inputs=text_input, outputs=output_file)
|
||
|
||
with gr.Tab("下载pdf"):
|
||
gr.Markdown("## pdf 下载指令修复")
|
||
with gr.Row():
|
||
curl_text_input = gr.Textbox(lines=10, placeholder="请在此输入cURL脚本...")
|
||
with gr.Row():
|
||
with gr.Column():
|
||
pdf_filename = gr.Textbox(placeholder="输入文件名")
|
||
with gr.Column():
|
||
pdf_download_button = gr.Button("下载")
|
||
with gr.Row():
|
||
pdf_download_result = gr.File(label="下载pdf文件")
|
||
|
||
pdf_download_button.click(download_pdf, inputs=[curl_text_input, pdf_filename], outputs=pdf_download_result)
|
||
|
||
with gr.Tab("图片识别"):
|
||
gr.Markdown("## OCR 图片识别")
|
||
with gr.Row():
|
||
with gr.Column():
|
||
image_input = gr.Image(type="pil", label="上传图片")
|
||
with gr.Column():
|
||
btn_recognize = gr.Button("识别")
|
||
text_output = gr.Textbox(label="OCR 识别结果")
|
||
|
||
btn_recognize.click(fn=run_ocr, inputs=image_input, outputs=text_output)
|
||
|
||
iface.load(fn=update_log_output, outputs=[log_output], every=1)
|
||
|
||
iface.launch(server_name="0.0.0.0")
|