-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #2 from overcrash66/develop
add web app
- Loading branch information
Showing
6 changed files
with
365 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,165 @@ | ||
from transformers import WhisperProcessor, WhisperForConditionalGeneration | ||
import torchaudio | ||
import logging | ||
import librosa | ||
import torch | ||
import sounddevice as sd | ||
from transformers import MBartForConditionalGeneration, MBart50TokenizerFast | ||
from TTS.api import TTS | ||
import time | ||
import os | ||
import unicodedata | ||
|
||
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | ||
|
||
class CustomTranslator: | ||
def __init__(self, output_dir="output"): | ||
self.target_language = "" | ||
self.source_language = "" | ||
self.translation_method = "" | ||
self.output_dir = output_dir | ||
os.makedirs(self.output_dir, exist_ok=True) | ||
# Initialize other attributes as needed | ||
|
||
def load_models(self): | ||
self.processor = WhisperProcessor.from_pretrained("openai/whisper-large-v3") | ||
self.model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-large-v3").to(device) | ||
# self.tts = TTS("tts_models/multilingual/multi-dataset/xtts_v2").to(device) | ||
|
||
def process_audio_chunk(self, input_path, target_language, chunk_idx, output_path, translation_method): | ||
try: | ||
if translation_method == 'Local': | ||
self.load_models() | ||
start_time = time.time() | ||
# Load input audio file using librosa | ||
input_waveform, input_sampling_rate = librosa.load(input_path, sr=None, mono=True) | ||
|
||
# Convert NumPy array to PyTorch tensor if needed | ||
if not isinstance(input_waveform, torch.Tensor): | ||
input_waveform = torch.tensor(input_waveform) | ||
|
||
forced_decoder_ids = self.processor.get_decoder_prompt_ids(language=target_language, task="translate") | ||
|
||
# Ensure the input audio has a proper frame rate | ||
if input_sampling_rate != 16000: | ||
resampler = torchaudio.transforms.Resample(orig_freq=input_sampling_rate, new_freq=16000) | ||
input_waveform = resampler(input_waveform) | ||
|
||
# Process the input audio with the processor | ||
input_features = self.processor(input_waveform.numpy(), sampling_rate=16000, return_tensors="pt") | ||
|
||
# Move input features to the device used by the model | ||
input_features = {k: v.to(device) for k, v in input_features.items()} | ||
|
||
# Generate token ids | ||
predicted_ids = self.model.generate(input_features["input_features"], forced_decoder_ids=forced_decoder_ids) | ||
|
||
# Decode token ids to text | ||
transcription = self.processor.batch_decode(predicted_ids, skip_special_tokens=True)[0] | ||
|
||
del input_waveform, input_sampling_rate | ||
|
||
end_time = time.time() | ||
execution_time = (end_time - start_time) / 60 | ||
print(f"Transcription Execution time: {execution_time:.2f} minutes") | ||
|
||
# Fix a bug: Text validation check if we have duplicate successive words | ||
words = transcription.split() | ||
cleaned_words = [words[0]] | ||
|
||
for word in words[1:]: | ||
if word != cleaned_words[-1]: | ||
cleaned_words.append(word) | ||
|
||
cleaned_str = ' '.join(cleaned_words) | ||
|
||
transcription = cleaned_str | ||
|
||
# Fix duplicate successive sentences | ||
sentences = transcription.split('.') | ||
cleaned_sentences = [sentences[0]] | ||
|
||
for sentence in sentences[1:]: | ||
if sentence != cleaned_sentences[-1]: | ||
cleaned_sentences.append(sentence) | ||
|
||
cleaned_transcription = '.'.join(cleaned_sentences) | ||
|
||
transcription = cleaned_transcription | ||
print('Speech recognition and translate to English text: ' + str(transcription)) | ||
|
||
Translation_chunk_output_path = os.path.join(self.output_dir, f"{os.path.splitext(os.path.basename(output_path))[0]}_Translation_chunk{chunk_idx + 1}.wav") | ||
|
||
# If target language is English, skip text translation | ||
if target_language != 'en': | ||
# Local text translation | ||
print("Local text translation started..") | ||
start_time = time.time() | ||
tt = MBartForConditionalGeneration.from_pretrained("SnypzZz/Llama2-13b-Language-translate").to(device) | ||
tokenizer = MBart50TokenizerFast.from_pretrained("SnypzZz/Llama2-13b-Language-translate", src_lang="en_XX", device=device) | ||
|
||
# Tokenize and convert to PyTorch tensor | ||
inputs = tokenizer(transcription, return_tensors="pt") | ||
input_ids = inputs["input_ids"].to(device) | ||
|
||
# Map target languages to model language codes | ||
language_mapping = { | ||
"en": "en_XX", | ||
"es": "es_XX", | ||
"fr": "fr_XX", | ||
"de": "de_DE", | ||
"ja": "ja_XX", | ||
"ko": "ko_KR", | ||
"tr": "tr_TR", | ||
"ar": "ar_AR", | ||
"ru": "ru_RU", | ||
"he": "he_IL", | ||
"hi": "hi_IN", | ||
"it": "it_IT", | ||
"pt": "pt_XX", | ||
"zh": "zh_CN", | ||
"cs": "cs_CZ", | ||
"nl": "nl_XX", | ||
"pl": "pl_PL", | ||
} | ||
|
||
# Set the target language based on the mapping | ||
model_target_language = language_mapping.get(target_language, "en_XX") | ||
|
||
# Generate tokens on the GPU | ||
generated_tokens = tt.generate(input_ids=input_ids, forced_bos_token_id=tokenizer.lang_code_to_id[model_target_language]) | ||
|
||
# Decode and join the translated text | ||
translated_text = tokenizer.batch_decode(generated_tokens, skip_special_tokens=True) | ||
translated_text = ", ".join(translated_text) | ||
|
||
logging.info(f"Processing successful. Translated text: {translated_text}") | ||
end_time = time.time() | ||
execution_time = (end_time - start_time) / 60 | ||
print(f"Local Translation Execution time: {execution_time:.2f} minutes") | ||
|
||
if target_language == 'en': | ||
translated_text = transcription | ||
|
||
# Generate final audio output from translated text | ||
self.generate_audio(translated_text, Translation_chunk_output_path, target_language, input_path) | ||
|
||
# Log success | ||
logging.info(f"Translation successful for {input_path}. Translated text: {transcription}") | ||
return translated_text | ||
|
||
except Exception as e: | ||
# Log errors | ||
logging.error(f"Error processing audio: {e}") | ||
raise # Re-raise the exception | ||
|
||
def generate_audio(self, text, output_path, target_language, input_path): | ||
print("Generate audio") | ||
|
||
# Text to speech to a file | ||
start_time = time.time() | ||
self.tts = TTS("tts_models/multilingual/multi-dataset/xtts_v2").to(device) | ||
self.tts.tts_to_file(text=text, speaker_wav=input_path, language=target_language, file_path=output_path) | ||
end_time = time.time() | ||
execution_time = (end_time - start_time) / 60 | ||
print(f"Generate_audio Execution time: {execution_time:.2f} minutes") |
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,189 @@ | ||
import gradio as gr | ||
import os | ||
import subprocess | ||
import threading | ||
import webbrowser | ||
from pydub import AudioSegment | ||
from pydub.utils import mediainfo | ||
from OpenTranslator.translator import CustomTranslator | ||
import unicodedata | ||
import librosa | ||
|
||
current_dir = os.path.dirname(os.path.abspath(__file__)) | ||
# Initialize the translator instance with an output directory | ||
output_dir = os.path.join(current_dir, "output") | ||
|
||
translator_instance = CustomTranslator(output_dir=output_dir) | ||
|
||
# Define the languages dictionary | ||
languages = { | ||
"English": "en", | ||
"Spanish": "es", | ||
"French": "fr", | ||
"German": "de", | ||
"Japanese": "ja", | ||
"Korean": "ko", | ||
"Turkish": "tr", | ||
"Arabic": "ar", | ||
"Russian": "ru", | ||
"Hebrew": "he", | ||
"Hindi": "hi", | ||
"Italian": "it", | ||
"Portuguese": "pt", | ||
"Chinese (Mandarin)": "zh", | ||
"Czech": "cs", | ||
"Dutch": "nl", | ||
"Polish": "pl" | ||
} | ||
|
||
language_choices = [(lang, code) for lang, code in languages.items()] | ||
|
||
# Define the translation options | ||
TextTranslationOption = ["Local"] | ||
|
||
# Function to handle file uploads | ||
def upload_file(file): | ||
global audio_path | ||
audio_path = file.name | ||
#return f"Selected File Title: {os.path.basename(audio_path)}" | ||
|
||
# Function to run the translation process | ||
def run_translation(translation_method, target_lang): | ||
output_path = os.path.join(output_dir, f"{os.path.splitext(os.path.basename(audio_path))[0]}_translated.mp3") | ||
input_file = audio_path | ||
print(audio_path) | ||
input_duration = get_audio_duration(input_file) | ||
print('input_duration: '+str(input_duration)) | ||
if input_duration > 30: | ||
max_chunk_duration = 30 | ||
num_chunks = int(input_duration / max_chunk_duration) | ||
chunk_files = [] | ||
Translation_chunk_files = [] | ||
translated_text = [] | ||
|
||
for chunk_idx in range(num_chunks): | ||
print('duration more then 30- num_chunks: '+str(num_chunks)) | ||
print('duration more then 30- chunk_idx'+str(chunk_idx)) | ||
start_time = chunk_idx * max_chunk_duration | ||
end_time = min((chunk_idx + 1) * max_chunk_duration, input_duration) | ||
chunk_output_path = os.path.join(output_dir, f"{os.path.splitext(os.path.basename(output_path))[0]}_chunk{chunk_idx + 1}.wav") | ||
|
||
split_audio_chunk(input_file, chunk_output_path, start_time, end_time) | ||
|
||
try: | ||
translation_result = translator_instance.process_audio_chunk(chunk_output_path, | ||
target_lang, | ||
chunk_idx, output_path, translation_method) | ||
except Exception as e: | ||
print(f"{e}") | ||
return "An Error occurred!" | ||
|
||
translated_text.append(translation_result) | ||
|
||
chunk_files.append(chunk_output_path) | ||
Translation_chunk_output_path = os.path.join(output_dir, f"{os.path.splitext(os.path.basename(output_path))[0]}_Translation_chunk{chunk_idx + 1}.wav") | ||
|
||
Translation_chunk_files.append(Translation_chunk_output_path) | ||
|
||
final_output_path = os.path.join(output_dir, f"{os.path.splitext(os.path.basename(output_path))[0]}-temp.wav") | ||
|
||
if translation_method == 'Local': | ||
merge_audio_files(Translation_chunk_files, final_output_path) | ||
|
||
subprocess.run(['ffmpeg', '-i', final_output_path, '-codec:a', 'libmp3lame', output_path], check=True) | ||
os.remove(final_output_path) | ||
|
||
delete_chunk_files(chunk_files) | ||
delete_chunk_files(Translation_chunk_files) | ||
chunk_files = [] # List to store individual chunk files | ||
Translation_chunk_files = [] | ||
|
||
translation_result = ', '.join(translated_text) | ||
return translation_result, output_path | ||
|
||
if input_duration <= 30 and translation_method == 'Local': | ||
#translated_text = [] | ||
chunk_output_path = input_file | ||
chunk_idx = 0 | ||
print('duration less then 30') | ||
try: | ||
translation_result = translator_instance.process_audio_chunk(chunk_output_path, | ||
target_lang, | ||
chunk_idx, output_path, translation_method) | ||
except Exception as e: | ||
print(f"{e}") | ||
return "An Error occurred!" | ||
|
||
#translated_text.append(translated_text) | ||
Translation_chunk_output_path = os.path.join(output_dir, f"{os.path.splitext(os.path.basename(output_path))[0]}_Translation_chunk1.wav") | ||
|
||
subprocess.run(['ffmpeg', '-i', Translation_chunk_output_path, '-codec:a', 'libmp3lame', output_path], check=True) | ||
os.remove(Translation_chunk_output_path) | ||
|
||
return translation_result, output_path | ||
|
||
# Function to split audio into a chunk using ffmpeg | ||
def split_audio_chunk(input_path, output_path, start_time, end_time): | ||
ffmpeg_cmd = f'ffmpeg -i "{input_path}" -ss {start_time} -to {end_time} -c copy "{output_path}"' | ||
subprocess.call(ffmpeg_cmd, shell=True) | ||
|
||
# Function to get the duration of an audio file | ||
def get_audio_duration(file_path): | ||
audio_info = librosa.get_duration(filename=file_path) | ||
duration_seconds = audio_info | ||
return duration_seconds | ||
|
||
# Function to merge audio files | ||
def merge_audio_files(input_files, output_file): | ||
merged_audio = AudioSegment.silent(duration=0) | ||
for input_file in input_files: | ||
try: | ||
chunk_audio = AudioSegment.from_file(input_file, format="wav") | ||
merged_audio += chunk_audio | ||
except FileNotFoundError as e: | ||
print(f"Error merging audio file {input_file}: {e}") | ||
except Exception as e: | ||
print(f"Error merging audio file {input_file}: {e}") | ||
merged_audio.export(output_file, format="wav") | ||
|
||
# Function to delete chunk files | ||
def delete_chunk_files(files): | ||
for file in files: | ||
try: | ||
os.remove(file) | ||
except FileNotFoundError as e: | ||
print(f"Error deleting file {file}: {e}") | ||
except Exception as e: | ||
print(f"Error deleting file {file}: {e}") | ||
|
||
def upload_audio(audio_file): | ||
return audio_file | ||
|
||
# Define the Gradio interface | ||
with gr.Blocks() as demo: | ||
gr.Markdown("# Open Translator") | ||
|
||
with gr.Row(): | ||
with gr.Column(): | ||
#gr.Markdown("## Select Translation Method:") | ||
translation_method = gr.Dropdown(choices=TextTranslationOption, value=TextTranslationOption[0], label="Translation Method") | ||
|
||
gr.Markdown("## Select Audio File:") | ||
audio_file = gr.File(type="filepath", label="Upload Audio File") | ||
audio_player = gr.Audio(label="Audio Player", interactive=True) | ||
|
||
#file_title = gr.Textbox(label="Selected File Title") | ||
audio_file.upload(upload_file, audio_file) | ||
audio_file.change(upload_audio, audio_file, audio_player) | ||
|
||
gr.Markdown("## Select Target Language:") | ||
target_lang = gr.Dropdown(choices=language_choices, value="ar", label="Target Language") | ||
#print(target_lang) | ||
translate_button = gr.Button("translate") | ||
|
||
with gr.Column(): | ||
translated_text = gr.Textbox(label="Translated text", lines=20, interactive=False) | ||
audio_output = gr.Audio(label="Translated Audio") | ||
translate_button.click(run_translation, inputs=[translation_method, target_lang], outputs=[translated_text, audio_output]) | ||
|
||
demo.launch(server_name="127.0.0.2", server_port=7861) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,4 +16,5 @@ cutlet | |
mecab | ||
unidic-lite | ||
google-api-python-client | ||
gtts | ||
gtts | ||
gradio |