import random from aiobotocore.client import BaseClient from ielts_be.dtos.listening import Dialog from ielts_be.services import ITextToSpeechService from ielts_be.configs.constants import NeuralVoices class AWSPolly(ITextToSpeechService): def __init__(self, client: BaseClient): self._client = client async def synthesize_speech(self, text: str, voice: str, engine: str = "neural", output_format: str = "mp3"): tts_response = await self._client.synthesize_speech( Engine=engine, Text=text, OutputFormat=output_format, VoiceId=voice ) return await tts_response['AudioStream'].read() async def text_to_speech(self, dialog: Dialog) -> bytes: if not dialog.conversation and not dialog.monologue: raise ValueError("Unsupported argument for text_to_speech") if not dialog.conversation: audio_segments = await self._text_to_speech(dialog.monologue) else: audio_segments = await self._conversation_to_speech(dialog) final_message = await self.synthesize_speech( "This audio recording, for the listening exercise, has finished.", "Stephen" ) # Add finish message audio_segments.append(final_message) # Combine the audio segments into a single audio file combined_audio = b"".join(audio_segments) return combined_audio # Save the combined audio to a single file #async with aiofiles.open(file_name, "wb") as f: # await f.write(combined_audio) #print("Speech segments saved to " + file_name) async def _text_to_speech(self, text: str): voice = random.choice(NeuralVoices.ALL_NEURAL_VOICES)['Id'] audio_segments = [] for part in self._divide_text(text): audio_segments.append(await self.synthesize_speech(part, voice)) return audio_segments async def _conversation_to_speech(self, dialog: Dialog): audio_segments = [] for convo_payload in dialog.conversation: audio_segments.append(await self.synthesize_speech(convo_payload.text, convo_payload.voice)) return audio_segments @staticmethod def _divide_text(text, max_length=3000): if len(text) <= max_length: return [text] divisions = [] current_position = 0 while current_position < len(text): next_position = min(current_position + max_length, len(text)) next_period_position = text.rfind('.', current_position, next_position) if next_period_position != -1 and next_period_position > current_position: divisions.append(text[current_position:next_period_position + 1]) current_position = next_period_position + 1 else: # If no '.' found in the next chunk, split at max_length divisions.append(text[current_position:next_position]) current_position = next_position return divisions