129 lines
4.1 KiB
Python
129 lines
4.1 KiB
Python
import whisper
|
|
import os
|
|
import nltk
|
|
import boto3
|
|
import random
|
|
|
|
nltk.download('words')
|
|
from nltk.corpus import words
|
|
from helper.constants import *
|
|
|
|
|
|
def speech_to_text(file_path):
|
|
if os.path.exists(file_path):
|
|
model = whisper.load_model("base")
|
|
result = model.transcribe(file_path, fp16=False, language='English', verbose=False)
|
|
return result["text"]
|
|
else:
|
|
print("File not found:", file_path)
|
|
raise Exception("File " + file_path + " not found.")
|
|
|
|
|
|
def text_to_speech(text: str, file_name: str):
|
|
# Initialize the Amazon Polly client
|
|
client = boto3.client(
|
|
'polly',
|
|
region_name='eu-west-1',
|
|
aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
|
|
aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY")
|
|
)
|
|
voice = random.choice(ALL_NEURAL_VOICES)['Id']
|
|
# Initialize an empty list to store audio segments
|
|
audio_segments = []
|
|
for part in divide_text(text):
|
|
tts_response = client.synthesize_speech(
|
|
Engine="neural",
|
|
Text=part,
|
|
OutputFormat="mp3",
|
|
VoiceId=voice
|
|
)
|
|
audio_segments.append(tts_response['AudioStream'].read())
|
|
|
|
# Add finish message
|
|
audio_segments.append(client.synthesize_speech(
|
|
Engine="neural",
|
|
Text="This audio recording, for the listening exercise, has finished.",
|
|
OutputFormat="mp3",
|
|
VoiceId="Stephen"
|
|
)['AudioStream'].read())
|
|
|
|
# Combine the audio segments into a single audio file
|
|
combined_audio = b"".join(audio_segments)
|
|
# Save the combined audio to a single file
|
|
with open(file_name, "wb") as f:
|
|
f.write(combined_audio)
|
|
|
|
print("Speech segments saved to " + file_name)
|
|
|
|
|
|
def conversation_text_to_speech(conversation: list, file_name: str):
|
|
# Initialize the Amazon Polly client
|
|
client = boto3.client(
|
|
'polly',
|
|
region_name='eu-west-1',
|
|
aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
|
|
aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY")
|
|
)
|
|
# Initialize an empty list to store audio segments
|
|
audio_segments = []
|
|
# Iterate through the text segments, convert to audio segments, and store them
|
|
for segment in conversation:
|
|
response = client.synthesize_speech(
|
|
Engine="neural",
|
|
Text=segment["text"],
|
|
OutputFormat="mp3",
|
|
VoiceId=segment["voice"]
|
|
)
|
|
audio_segments.append(response['AudioStream'].read())
|
|
|
|
# Add finish message
|
|
audio_segments.append(client.synthesize_speech(
|
|
Engine="neural",
|
|
Text="This audio recording, for the listening exercise, has finished.",
|
|
OutputFormat="mp3",
|
|
VoiceId="Stephen"
|
|
)['AudioStream'].read())
|
|
|
|
# Combine the audio segments into a single audio file
|
|
combined_audio = b"".join(audio_segments)
|
|
# Save the combined audio to a single file
|
|
with open(file_name, "wb") as f:
|
|
f.write(combined_audio)
|
|
|
|
print("Speech segments saved to " + file_name)
|
|
|
|
|
|
def has_words(text: str):
|
|
english_words = set(words.words())
|
|
words_in_input = text.split()
|
|
return any(word.lower() in english_words for word in words_in_input)
|
|
|
|
|
|
def has_x_words(text: str, quantity):
|
|
english_words = set(words.words())
|
|
words_in_input = text.split()
|
|
english_word_count = sum(1 for word in words_in_input if word.lower() in english_words)
|
|
return english_word_count >= quantity
|
|
|
|
|
|
def divide_text(text, max_length=3000):
|
|
if len(text) <= max_length:
|
|
return [text]
|
|
|
|
divisions = []
|
|
current_position = 0
|
|
|
|
while current_position < len(text):
|
|
next_position = min(current_position + max_length, len(text))
|
|
next_period_position = text.rfind('.', current_position, next_position)
|
|
|
|
if next_period_position != -1 and next_period_position > current_position:
|
|
divisions.append(text[current_position:next_period_position + 1])
|
|
current_position = next_period_position + 1
|
|
else:
|
|
# If no '.' found in the next chunk, split at max_length
|
|
divisions.append(text[current_position:next_position])
|
|
current_position = next_position
|
|
|
|
return divisions
|