import gradio as gr import subprocess import os import re import io import base64 from dotenv import load_dotenv from openai import OpenAI from PIL import Image load_dotenv() def convert_to_wav(audio_file): # 使用 ffmpeg 将音频文件转换为 wav 格式 output_wav_file = "output.wav" print(f"开始转换音频文件 {audio_file} 为 WAV 格式...") subprocess.run([ "ffmpeg", "-y", "-i", audio_file, "-ac", "1", "-ar", "16000", output_wav_file ]) print(f"音频文件 {audio_file} 已转换为 WAV 格式,输出文件为 {output_wav_file}") return output_wav_file def transcribe_audio(wav_file, original_filename, offset_time, duration_time): # 调用 whisper 命令行程序进行转写 whisper_cmd = [ "/home/tmfc/apps/whisper/main", "--language", "Chinese", "-otxt", "-ot", str(offset_time * 1000), "-d", str(duration_time * 1000), "-of", "/home/tmfc/apps/whisper/" + original_filename, "-m", "/home/tmfc/apps/whisper/models/ggml-large-v3-q5_0.bin", wav_file ] print(whisper_cmd) print(f"开始转写音频文件 {wav_file}...") result = subprocess.run(whisper_cmd, capture_output=True, text=True) print(result) # 将转写结果保存为 txt 文件 # txt_file = "transcription.txt" # with open(txt_file, "w") as f: # f.write(result.stdout) txt_file = original_filename + ".txt" print(f"音频文件 {wav_file} 转写完成,结果已保存为 {txt_file}") return txt_file def process_audio(audio_file, offset_time, duration_time): print("开始处理音频文件...") # 获取上传的文件名 original_filename = os.path.basename(audio_file) # 转换音频文件为 wav 格式 wav_file = convert_to_wav(audio_file) # 转写音频文件 txt_file = transcribe_audio(wav_file, original_filename, offset_time, duration_time) print("音频文件处理完成") return txt_file def direct_transcribe(audio_file, offset_time, duration_time): print("开始直接转写音频文件...") # 转写音频文件 txt_file = transcribe_audio("output.wav", "output.wav", offset_time, duration_time) print("音频文件直接转写完成") return txt_file batch_directory = '/mnt/d/share/audio/' def list_files(): # 获取目录下的所有文件 files = os.listdir(batch_directory) # 过滤掉目录,只保留文件 files = [f for f in files if os.path.isfile(os.path.join(batch_directory, f)) and f.lower().endswith(('.mp3', '.m4a'))] return files log_content = "" def batch_transcribe(): global log_content files = list_files() result_file = [] for file in files: # 转换音频文件为 wav 格式 log_entry = "转换" + file + "为 wav\n" log_content += log_entry wav_file = convert_to_wav(batch_directory + file) log_entry = "转换wav成功,开始转写\n" log_content += log_entry # 转写音频文件 txt_file = transcribe_audio(wav_file, file, 10, 0) log_entry = "转写 " + file + "完成\n" log_content += log_entry result_file.append(txt_file) return result_file def display_files(): files = list_files() return "\n".join(files) def get_log(): global log_content return log_content def update_log_output(): return gr.update(value=get_log()) def convert_to_docx(text): if text.strip() == "": return "输入框不能为空!" # 将输入内容写入 file.md 文件 with open("file.md", "w") as file: file.write(text) # 使用 pandoc 将 file.md 转换为 file.docx try: subprocess.run(["pandoc", "file.md", "-o", "file.docx"], check=True) except subprocess.CalledProcessError as e: return f"转换失败: {e}" # 返回 file.docx 文件供用户下载 return "file.docx" def remove_headers(curl_command): # 使用正则表达式去除 if-none-match 和 range 标头 curl_command = re.sub(r'(-H\s*\'if-none-match:[^\\]*\'\s*)', '', curl_command) curl_command = re.sub(r'(-H\s*\'range:[^\\]*\'\s*)', '', curl_command) return curl_command def do_download_pdf_file(curl_command, pdf_filename): # 去除不需要的标头 curl_command = remove_headers(curl_command) # 使用 subprocess 调用 curl 命令 try: result = subprocess.run(curl_command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # 保存文件到临时文件 if pdf_filename != "": temp_file_path = f"./{pdf_filename}.pdf" else: temp_file_path = "./download.pdf" with open(temp_file_path, 'wb') as f: f.write(result.stdout) return "File downloaded successfully", temp_file_path except subprocess.CalledProcessError as e: return f"Failed to download file. Error: {e.stderr.decode()}", None def download_pdf(curl_command, pdf_filename): message, file_path = do_download_pdf_file(curl_command, pdf_filename) return file_path def run_ocr(image): # 保存上传的图片到指定路径 image_path = os.path.expanduser("/home/tmfc/apps/got-ocr/img.png") image.save(image_path) # 调用 OCR 命令 command = [ "sudo", "-u", "tmfc", "/home/tmfc/miniconda3/envs/got/bin/python3", "/home/tmfc/apps/got-ocr/GOT/demo/run_ocr_2.0_crop.py", "--model-name", "/home/tmfc/apps/got-ocr/models/", "--image-file", image_path ] out_file = "/home/tmfc/apps/got-ocr/img.txt" try: with open(out_file, 'w') as f: result = subprocess.run(command, stdout=f, stderr=subprocess.PIPE, text=True) except subprocess.CalledProcessError as e: return f"识别失败: {e}" with open(out_file, 'r', encoding='utf-8') as f: content = f.read() return content def ocr_image(image): # 将图片转换为 base64 编码 buffered = io.BytesIO() image.save(buffered, format="JPEG") image_base64 = base64.b64encode(buffered.getvalue()).decode("utf-8") messages = [ { "role": "user", "content": [ { "type": "text", "text": "识别图片中的文字并以纯文本(txt)格式输出" }, { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{image_base64}" } } ] } ] client = OpenAI( # This is the default and can be omitted api_key=os.getenv("GLM_API_KEY"), base_url="https://open.bigmodel.cn/api/paas/v4/" ) chat_completion = client.chat.completions.create( messages=messages, model="glm-4v-plus", ) return chat_completion.choices[0].message.content def batch_ocr(files): results = [] for file in files: image = Image.open(file) ocr_result = ocr_image(image) results.append(f"\n\n{ocr_result}\n\n") # 将结果写入 Markdown 文件 output_file = "ocr_results.txt" with open(output_file, "w") as f: f.write("\n".join(results)) return output_file with gr.Blocks() as iface: gr.Markdown("# 大模型工具集") with gr.Tabs(): with gr.TabItem("音频转写"): with gr.Row(): audio_input = gr.Audio(type="filepath", label="上传音频文件") with gr.Column(): offset_input = gr.Number(label="偏移时间 (秒)") duration_input = gr.Number(label="转写时长 (秒)") with gr.Row(): process_button = gr.Button("处理并转写") direct_transcribe_button = gr.Button("直接转写") output_file = gr.File(label="转写结果") process_button.click(process_audio, inputs=[audio_input, offset_input, duration_input], outputs=output_file) direct_transcribe_button.click(direct_transcribe, inputs=[audio_input, offset_input, duration_input], outputs=output_file) with gr.TabItem("音频批量转写"): with gr.Row(): link_upload = gr.HTML(value='点击上传文件') with gr.Row(): file_list = gr.Textbox(label="文件列表") with gr.Column(): list_file_button = gr.Button("刷新文件") batch_process_button = gr.Button("批量处理") with gr.Column(): batch_output_file = gr.File(label="批量转写结果") with gr.Row(): log_output = gr.Textbox(label="日志信息", lines=10) list_file_button.click(fn=display_files, outputs=file_list) batch_process_button.click(batch_transcribe, outputs=batch_output_file) with gr.Tab("Markdown 转 Word"): gr.Markdown("## Markdown 转 Word 转换器") with gr.Row(): text_input = gr.Textbox(lines=10, placeholder="请在此输入 Markdown 内容...") with gr.Row(): convert_button = gr.Button("转换") with gr.Row(): output_file = gr.File(label="下载转换后的文件") convert_button.click(convert_to_docx, inputs=text_input, outputs=output_file) with gr.Tab("下载pdf"): gr.Markdown("## pdf 下载指令修复") with gr.Row(): curl_text_input = gr.Textbox(lines=10, placeholder="请在此输入cURL脚本...") with gr.Row(): with gr.Column(): pdf_filename = gr.Textbox(placeholder="输入文件名") with gr.Column(): pdf_download_button = gr.Button("下载") with gr.Row(): pdf_download_result = gr.File(label="下载pdf文件") pdf_download_button.click(download_pdf, inputs=[curl_text_input, pdf_filename], outputs=pdf_download_result) with gr.Tab("图片识别"): gr.Markdown("## OCR 图片识别") with gr.Row(): with gr.Column(): image_input = gr.Image(type="pil", label="上传图片") with gr.Column(): btn_recognize = gr.Button("识别") text_output = gr.Textbox(label="OCR 识别结果") btn_recognize.click(fn=run_ocr, inputs=image_input, outputs=text_output) with gr.Tab("批量图片识别"): gr.Markdown("## OCR 图片批量识别") with gr.Row(): with gr.Column(): input_files = gr.File(file_count="multiple", label="Upload Images") with gr.Column(): output_file = gr.File(label="Download OCR Results") with gr.Row(): process_button = gr.Button("Process Images") # 绑定按钮点击事件 process_button.click(batch_ocr, inputs=input_files, outputs=output_file) iface.load(fn=update_log_output, outputs=[log_output], every=1) iface.launch(server_name="0.0.0.0")