Files
encoach_backend/ielts_be/services/impl/third_parties/aws_polly.py
Carlos-Mesquita 9bfad2d47f ENCOA-295
2024-12-26 12:31:22 +00:00

89 lines
3.2 KiB
Python

import random
from typing import Optional
from aiobotocore.client import BaseClient
from ielts_be.dtos.listening import Dialog
from ielts_be.services import ITextToSpeechService
from ielts_be.configs.constants import NeuralVoices
class AWSPolly(ITextToSpeechService):
def __init__(self, client: BaseClient):
self._client = client
async def synthesize_speech(self, text: str, voice: str, engine: str = "neural", output_format: str = "mp3"):
tts_response = await self._client.synthesize_speech(
Engine=engine,
Text=text,
OutputFormat=output_format,
VoiceId=voice
)
return await tts_response['AudioStream'].read()
async def text_to_speech(self, dialog: Dialog, include_final_clue = True) -> bytes:
if not dialog.conversation and not dialog.monologue:
raise ValueError("Unsupported argument for text_to_speech")
if not dialog.conversation:
audio_segments = await self._text_to_speech(dialog.monologue)
else:
audio_segments = await self._conversation_to_speech(dialog)
if include_final_clue:
final_message = await self.synthesize_speech(
"This audio recording, for the listening exercise, has finished.",
"Stephen"
)
# Add finish message
audio_segments.append(final_message)
# Combine the audio segments into a single audio file
combined_audio = b"".join(audio_segments)
return combined_audio
# Save the combined audio to a single file
#async with aiofiles.open(file_name, "wb") as f:
# await f.write(combined_audio)
#print("Speech segments saved to " + file_name)
async def _text_to_speech(self, text: str):
voice = random.choice(NeuralVoices.ALL_NEURAL_VOICES)['Id']
audio_segments = []
for part in self._divide_text(text):
audio_segments.append(await self.synthesize_speech(part, voice))
return audio_segments
async def _conversation_to_speech(self, dialog: Dialog):
audio_segments = []
for convo_payload in dialog.conversation:
audio_segments.append(await self.synthesize_speech(convo_payload.text, convo_payload.voice))
return audio_segments
@staticmethod
def _divide_text(text, max_length=3000):
if len(text) <= max_length:
return [text]
divisions = []
current_position = 0
while current_position < len(text):
next_position = min(current_position + max_length, len(text))
next_period_position = text.rfind('.', current_position, next_position)
if next_period_position != -1 and next_period_position > current_position:
divisions.append(text[current_position:next_period_position + 1])
current_position = next_period_position + 1
else:
# If no '.' found in the next chunk, split at max_length
divisions.append(text[current_position:next_position])
current_position = next_position
return divisions