import asyncio import os import uuid from logging import getLogger from typing import Dict, List import aiofiles from ielts_be.configs.constants import GPTModels, TemperatureSettings, FilePaths from ielts_be.dtos.speaking import GradeSpeakingItem from ielts_be.helpers import TextHelper from ielts_be.repositories import IFileStorage from ielts_be.services import ILLMService, ISpeechToTextService class GradeSpeaking: def __init__(self, llm: ILLMService, file_storage: IFileStorage, stt: ISpeechToTextService): self._llm = llm self._file_storage = file_storage self._stt = stt self._logger = getLogger(__name__) async def grade_speaking_task(self, task: int, items: List[GradeSpeakingItem]) -> Dict: request_id = str(uuid.uuid4()) self._log(task, request_id, f"Received request to grade speaking task {task}.") if task != 2: self._log(task, request_id, f'Received {len(items)} total answers.') temp_files = [] try: # Save all files first temp_files = await asyncio.gather(*[ self.save_file(item) for item in items ]) # Process all transcriptions concurrently (up to 4) self._log(task, request_id, 'Starting batch transcription') text_answers = await asyncio.gather(*[ self._stt.speech_to_text(file_path) for file_path in temp_files ]) for answer in text_answers: self._log(task, request_id, f'Transcribed answer: {answer}') if not TextHelper.has_x_words(answer, 20): self._log( task, request_id, f'The answer had less words than threshold 20 to be graded. Answer: {answer}' ) return self._zero_rating("The audio recorded does not contain enough english words to be graded.") # Get perfect answers self._log(task, request_id, 'Requesting perfect answers') perfect_answers = await asyncio.gather(*[ self._get_perfect_answer(task, item.question) for item in items ]) # Format the responses if task in {1, 3}: self._log(task, request_id, 'Formatting answers and questions for prompt.') formatted_text = "" for i, (item, transcribed_answer) in enumerate(zip(items, text_answers), start=1): formatted_text += f"**Question {i}:**\n{item.question}\n\n" formatted_text += f"**Answer {i}:**\n{transcribed_answer}\n\n" self._log(task, request_id, f'Formatted answers and questions for prompt: {formatted_text}') questions_and_answers = f'\n\n The questions and answers are: \n\n{formatted_text}' else: questions_and_answers = f'\n Question: "{items[0].question}" \n Answer: "{text_answers[0]}"' self._log(task, request_id, 'Requesting grading of the answer(s).') response = await self._grade_task(task, questions_and_answers) self._log(task, request_id, f'Answer(s) graded: {response}') if task in {1, 3}: self._log(task, request_id, 'Adding perfect answer(s) to response.') # TODO: check if it is answer["answer"] instead for i, answer in enumerate(perfect_answers, start=1): response['perfect_answer_' + str(i)] = answer self._log(task, request_id, 'Getting speaking corrections in parallel') # Get all corrections in parallel fixed_texts = await asyncio.gather(*[ self._get_speaking_corrections(answer) for answer in text_answers ]) self._log(task, request_id, 'Adding transcript and fixed texts to response.') for i, (answer, fixed) in enumerate(zip(text_answers, fixed_texts), start=1): response['transcript_' + str(i)] = answer response['fixed_text_' + str(i)] = fixed else: response['transcript'] = text_answers[0] self._log(task, request_id, 'Requesting fixed text.') response['fixed_text'] = await self._get_speaking_corrections(text_answers[0]) self._log(task, request_id, f'Fixed text: {response["fixed_text"]}') response['perfect_answer'] = perfect_answers[0]["answer"] solutions = [] for file_name in temp_files: solutions.append(await self._file_storage.upload_file_firebase_get_url(f'{FilePaths.FIREBASE_SPEAKING_VIDEO_FILES_PATH}{uuid.uuid4()}.wav', file_name)) response["overall"] = self._fix_speaking_overall(response["overall"], response["task_response"]) response["solutions"] = solutions if task in {1,3}: response["answer"] = solutions else: response["fullPath"] = solutions[0] self._log(task, request_id, f'Final response: {response}') return response finally: for file_path in temp_files: try: if os.path.exists(file_path): os.remove(file_path) except Exception as e: self._log(task, request_id, f'Error cleaning up temp file {file_path}: {str(e)}') def _log(self, task: int, request_id: str, message: str): self._logger.info(f'POST - speaking_task_{task} - {request_id} - {message}') async def _get_perfect_answer(self, task: int, question: str): messages = [ { "role": "system", "content": ( 'You are a helpful assistant designed to output JSON on this format: {"answer": "perfect answer"}' ) }, { "role": "user", "content": ( 'Provide a perfect answer according to ielts grading system to the following ' f'Speaking Part {task} question: "{question}"' ) } ] if task == 1: messages.append({ "role": "user", "content": 'The answer must be 2 or 3 sentences long.' }) gpt_model = GPTModels.GPT_4_O if task == 1 else GPTModels.GPT_3_5_TURBO return await self._llm.prediction( gpt_model, messages, ["answer"], TemperatureSettings.GRADING_TEMPERATURE ) async def _grade_task(self, task: int, questions_and_answers: str) -> Dict: messages = [ { "role": "system", "content": ( f'You are a helpful assistant designed to output JSON on this format: {self._grade_template()}' ) }, { "role": "user", "content": ( f'Evaluate the given Speaking Part {task} response based on the IELTS grading system, ensuring a ' 'strict assessment that penalizes errors. Deduct points for deviations from the task, and ' 'assign a score of 0 if the response fails to address the question. Additionally, provide ' 'detailed commentary highlighting both strengths and weaknesses in the response.' ) + questions_and_answers } ] task_specific = { "1": ( 'Address the student as "you". If the answers are not 2 or 3 sentences long, warn the ' 'student that they should be.' ), "2": 'Address the student as "you"', "3": 'Address the student as "you" and pay special attention to coherence between the answers.' } messages.append({ "role": "user", "content": task_specific[str(task)] }) if task in {1, 3}: messages.extend([ { "role": "user", "content": ( 'For pronunciations act as if you heard the answers and they were transcribed ' 'as you heard them.' ) }, { "role": "user", "content": 'The comments must be long, detailed, justify the grading and suggest improvements.' } ]) return await self._llm.prediction( GPTModels.GPT_4_O, messages, ["comment"], TemperatureSettings.GRADING_TEMPERATURE ) @staticmethod def _fix_speaking_overall(overall: float, task_response: dict): grades = [category["grade"] for category in task_response.values()] if overall > max(grades) or overall < min(grades): total_sum = sum(grades) average = total_sum / len(grades) rounded_average = round(average, 0) return rounded_average return overall @staticmethod def _zero_rating(comment: str): return { "comment": comment, "overall": 0, "task_response": { "Fluency and Coherence": { "grade": 0.0, "comment": "" }, "Lexical Resource": { "grade": 0.0, "comment": "" }, "Grammatical Range and Accuracy": { "grade": 0.0, "comment": "" }, "Pronunciation": { "grade": 0.0, "comment": "" } } } async def _get_speaking_corrections(self, text): messages = [ { "role": "system", "content": ( 'You are a helpful assistant designed to output JSON on this format: ' '{"fixed_text": "fixed transcription with no misspelling errors"}' ) }, { "role": "user", "content": ( 'Fix the errors in the provided transcription and put it in a JSON. ' f'Do not complete the answer, only replace what is wrong. \n The text: "{text}"' ) } ] response = await self._llm.prediction( GPTModels.GPT_3_5_TURBO, messages, ["fixed_text"], 0.2, False ) return response["fixed_text"] @staticmethod def _grade_template(): return { "comment": "extensive comment about answer quality", "overall": 0.0, "task_response": { "Fluency and Coherence": { "grade": 0.0, "comment": ( "extensive comment about fluency and coherence, use examples to justify the grade awarded." ) }, "Lexical Resource": { "grade": 0.0, "comment": "extensive comment about lexical resource, use examples to justify the grade awarded." }, "Grammatical Range and Accuracy": { "grade": 0.0, "comment": ( "extensive comment about grammatical range and accuracy, use examples to justify the " "grade awarded." ) }, "Pronunciation": { "grade": 0.0, "comment": ( "extensive comment about pronunciation on the transcribed answer, use examples to justify the " "grade awarded." ) } } } @staticmethod async def save_file(item: GradeSpeakingItem) -> str: sound_file_name = "tmp/" + str(uuid.uuid4()) content = await item.answer.read() async with aiofiles.open(sound_file_name, 'wb') as f: await f.write(content) return sound_file_name