88 lines
3.2 KiB
Python
88 lines
3.2 KiB
Python
import random
|
|
from typing import Union
|
|
|
|
import aiofiles
|
|
from aiobotocore.client import BaseClient
|
|
|
|
from app.services.abc import ITextToSpeechService
|
|
from app.configs.constants import NeuralVoices
|
|
|
|
|
|
class AWSPolly(ITextToSpeechService):
|
|
|
|
def __init__(self, client: BaseClient):
|
|
self._client = client
|
|
|
|
async def synthesize_speech(self, text: str, voice: str, engine: str = "neural", output_format: str = "mp3"):
|
|
tts_response = await self._client.synthesize_speech(
|
|
Engine=engine,
|
|
Text=text,
|
|
OutputFormat=output_format,
|
|
VoiceId=voice
|
|
)
|
|
return await tts_response['AudioStream'].read()
|
|
|
|
async def text_to_speech(self, text: Union[list[str], str], file_name: str):
|
|
if isinstance(text, str):
|
|
audio_segments = await self._text_to_speech(text)
|
|
elif isinstance(text, list):
|
|
audio_segments = await self._conversation_to_speech(text)
|
|
else:
|
|
raise ValueError("Unsupported argument for text_to_speech")
|
|
|
|
final_message = await self.synthesize_speech(
|
|
"This audio recording, for the listening exercise, has finished.",
|
|
"Stephen"
|
|
)
|
|
|
|
# Add finish message
|
|
audio_segments.append(final_message)
|
|
|
|
# Combine the audio segments into a single audio file
|
|
combined_audio = b"".join(audio_segments)
|
|
# Save the combined audio to a single file
|
|
async with aiofiles.open(file_name, "wb") as f:
|
|
await f.write(combined_audio)
|
|
|
|
print("Speech segments saved to " + file_name)
|
|
|
|
async def _text_to_speech(self, text: str):
|
|
voice = random.choice(NeuralVoices.ALL_NEURAL_VOICES)['Id']
|
|
# Initialize an empty list to store audio segments
|
|
audio_segments = []
|
|
for part in self._divide_text(text):
|
|
audio_segments.append(await self.synthesize_speech(part, voice))
|
|
|
|
return audio_segments
|
|
|
|
async def _conversation_to_speech(self, conversation: list):
|
|
# Initialize an empty list to store audio segments
|
|
audio_segments = []
|
|
# Iterate through the text segments, convert to audio segments, and store them
|
|
for segment in conversation:
|
|
audio_segments.append(await self.synthesize_speech(segment["text"], segment["voice"]))
|
|
|
|
return audio_segments
|
|
|
|
@staticmethod
|
|
def _divide_text(text, max_length=3000):
|
|
if len(text) <= max_length:
|
|
return [text]
|
|
|
|
divisions = []
|
|
current_position = 0
|
|
|
|
while current_position < len(text):
|
|
next_position = min(current_position + max_length, len(text))
|
|
next_period_position = text.rfind('.', current_position, next_position)
|
|
|
|
if next_period_position != -1 and next_period_position > current_position:
|
|
divisions.append(text[current_position:next_period_position + 1])
|
|
current_position = next_period_position + 1
|
|
else:
|
|
# If no '.' found in the next chunk, split at max_length
|
|
divisions.append(text[current_position:next_position])
|
|
current_position = next_position
|
|
|
|
return divisions
|