import asyncio import os import uuid from logging import getLogger from typing import Dict, List import aiofiles from ielts_be.configs.constants import GPTModels, TemperatureSettings, FilePaths from ielts_be.dtos.speaking import GradeSpeakingItem from ielts_be.helpers import TextHelper from ielts_be.repositories import IFileStorage from ielts_be.services import ILLMService, ISpeechToTextService class GradeSpeaking: def __init__(self, llm: ILLMService, file_storage: IFileStorage, stt: ISpeechToTextService): self._llm = llm self._file_storage = file_storage self._stt = stt self._logger = getLogger(__name__) async def grade_speaking_task(self, task: int, items: List[GradeSpeakingItem]) -> Dict: request_id = str(uuid.uuid4()) self._log(task, request_id, f"Received request to grade speaking task {task}.") if task != 2: self._log(task, request_id, f'Received {len(items)} total answers.') temp_files = [] try: # Save all files first temp_files = await asyncio.gather(*[ self.save_file(item) for item in items ]) # Process all transcriptions concurrently (up to 4) self._log(task, request_id, 'Starting batch transcription') text_transcription_segments = await asyncio.gather(*[ self._stt.speech_to_text(file_path) for file_path in temp_files ], return_exceptions=True) successful_transcriptions = [] failed_indices = [] successful_indices = [] for i, result in enumerate(text_transcription_segments): if isinstance(result, Exception): self._log(task, request_id, f'Transcription failed for exercise {i + 1}: {str(result)}') failed_indices.append(i) elif isinstance(result, list): successful_transcriptions.append(result) successful_indices.append(i) text_answers = await asyncio.gather(*[ self._stt.fix_overlap(self._llm, answer_segments) for answer_segments in successful_transcriptions ]) for answer in text_answers: self._log(task, request_id, f'Transcribed answer: {answer}') if not TextHelper.has_x_words(answer, 20): self._log( task, request_id, f'The answer had less words than threshold 20 to be graded. Answer: {answer}' ) return self._zero_rating("The audio recorded does not contain enough english words to be graded.") # Get perfect answers self._log(task, request_id, 'Requesting perfect answers') perfect_answers = await asyncio.gather(*[ self._get_perfect_answer(task, item.question) for item in items ]) # Format the responses if task in {1, 3}: self._log(task, request_id, 'Formatting answers and questions for prompt.') formatted_text = "" for success_idx, orig_idx in enumerate(successful_indices): formatted_text += f"**Question {orig_idx + 1}:**\n{items[orig_idx].question}\n\n" formatted_text += f"**Answer {orig_idx + 1}:**\n{text_answers[success_idx]}\n\n" self._log(task, request_id, f'Formatted answers and questions for prompt: {formatted_text}') questions_and_answers = f'\n\n The questions and answers are: \n\n{formatted_text}' else: if len(text_answers) > 0: questions_and_answers = f'\n Question: "{items[0].question}" \n Answer: "{text_answers[0]}"' else: return self._zero_rating("The audio recording failed to be transcribed.") self._log(task, request_id, 'Requesting grading of the answer(s).') response = await self._grade_task(task, questions_and_answers) self._log(task, request_id, f'Answer(s) graded: {response}') if task in {1, 3}: self._log(task, request_id, 'Adding perfect answer(s) to response.') # Add responses for successful transcriptions for success_idx, orig_idx in enumerate(successful_indices): response['perfect_answer_' + str(orig_idx + 1)] = perfect_answers[ orig_idx] # Changed from success_idx response['transcript_' + str(orig_idx + 1)] = text_answers[success_idx] response['fixed_text_' + str(orig_idx + 1)] = await self._get_speaking_corrections( text_answers[success_idx]) # Add empty strings for failed transcriptions but keep perfect answers for failed_idx in failed_indices: response['perfect_answer_' + str(failed_idx + 1)] = perfect_answers[ failed_idx] # Keep perfect answer response['transcript_' + str(failed_idx + 1)] = "" response['fixed_text_' + str(failed_idx + 1)] = "" response[f'error_{failed_idx + 1}'] = f"Transcription failed for exercise {failed_idx + 1}" else: response['transcript'] = text_answers[0] if text_answers else "" response['fixed_text'] = await self._get_speaking_corrections(text_answers[0]) if text_answers else "" response['perfect_answer'] = perfect_answers[0]["answer"] if perfect_answers else "" solutions = [] for i, file_name in enumerate(temp_files): try: if i not in failed_indices: path = f'{FilePaths.FIREBASE_SPEAKING_VIDEO_FILES_PATH}{uuid.uuid4()}.wav' else: path = f'{FilePaths.FIREBASE_FAILED_TRANSCRIPTION_FILES_PATH}_grading_{request_id}_ex_{i + 1}.wav' solution_url = await self._file_storage.upload_file_firebase_get_url(path, file_name) solutions.append(solution_url) except Exception as e: self._log(task, request_id, f'Failed to upload file {i + 1}: {str(e)}') solutions.append("") response["overall"] = self._fix_speaking_overall(response["overall"], response["task_response"]) response["solutions"] = solutions if task in {1, 3}: response["answer"] = solutions else: response["fullPath"] = solutions[0] self._log(task, request_id, f'Final response: {response}') return response finally: for file_path in temp_files: try: if os.path.exists(file_path): os.remove(file_path) except Exception as e: self._log(task, request_id, f'Error cleaning up temp file {file_path}: {str(e)}') def _log(self, task: int, request_id: str, message: str): self._logger.info(f'POST - speaking_task_{task} - {request_id} - {message}') async def _get_perfect_answer(self, task: int, question: str): messages = [ { "role": "system", "content": ( 'You are a helpful assistant designed to output JSON on this format: {"answer": "perfect answer"}' ) }, { "role": "user", "content": ( 'Provide a perfect answer according to ielts grading system to the following ' f'Speaking Part {task} question: "{question}"' ) } ] if task == 1: messages.append({ "role": "user", "content": 'The answer must be 2 or 3 sentences long.' }) gpt_model = GPTModels.GPT_4_O if task == 1 else GPTModels.GPT_3_5_TURBO return await self._llm.prediction( gpt_model, messages, ["answer"], TemperatureSettings.GRADING_TEMPERATURE ) async def _grade_task(self, task: int, questions_and_answers: str) -> Dict: messages = [ { "role": "system", "content": ( f'You are a helpful assistant designed to output JSON on this format: {self._grade_template()}' ) }, { "role": "user", "content": ( f'Evaluate the given Speaking Part {task} response based on the IELTS grading system, ensuring a ' 'strict assessment that penalizes errors. Deduct points for deviations from the task, and ' 'assign a score of 0 if the response fails to address the question. Additionally, provide ' 'detailed commentary highlighting both strengths and weaknesses in the response.' ) + questions_and_answers } ] task_specific = { "1": ( 'Address the student as "you". If the answers are not 2 or 3 sentences long, warn the ' 'student that they should be.' ), "2": 'Address the student as "you"', "3": 'Address the student as "you" and pay special attention to coherence between the answers.' } messages.append({ "role": "user", "content": task_specific[str(task)] }) if task in {1, 3}: messages.extend([ { "role": "user", "content": ( 'For pronunciations act as if you heard the answers and they were transcribed ' 'as you heard them.' ) }, { "role": "user", "content": 'The comments must be long, detailed, justify the grading and suggest improvements.' } ]) return await self._llm.prediction( GPTModels.GPT_4_O, messages, ["comment"], TemperatureSettings.GRADING_TEMPERATURE ) @staticmethod def _fix_speaking_overall(overall: float, task_response: dict): grades = [category["grade"] for category in task_response.values()] if overall > max(grades) or overall < min(grades): total_sum = sum(grades) average = total_sum / len(grades) rounded_average = round(average, 0) return rounded_average return overall @staticmethod def _zero_rating(comment: str): return { "comment": comment, "overall": 0, "task_response": { "Fluency and Coherence": { "grade": 0.0, "comment": "" }, "Lexical Resource": { "grade": 0.0, "comment": "" }, "Grammatical Range and Accuracy": { "grade": 0.0, "comment": "" }, "Pronunciation": { "grade": 0.0, "comment": "" } } } async def _get_speaking_corrections(self, text): messages = [ { "role": "system", "content": ( 'You are a helpful assistant designed to output JSON on this format: ' '{"fixed_text": "fixed transcription with no misspelling errors"}' ) }, { "role": "user", "content": ( 'Fix the errors in the provided transcription and put it in a JSON. ' f'Do not complete the answer, only replace what is wrong. \n The text: "{text}"' ) } ] response = await self._llm.prediction( GPTModels.GPT_3_5_TURBO, messages, ["fixed_text"], 0.2, False ) return response["fixed_text"] @staticmethod def _grade_template(): return { "comment": "extensive comment about answer quality", "overall": 0.0, "task_response": { "Fluency and Coherence": { "grade": 0.0, "comment": ( "extensive comment about fluency and coherence, use examples to justify the grade awarded." ) }, "Lexical Resource": { "grade": 0.0, "comment": "extensive comment about lexical resource, use examples to justify the grade awarded." }, "Grammatical Range and Accuracy": { "grade": 0.0, "comment": ( "extensive comment about grammatical range and accuracy, use examples to justify the " "grade awarded." ) }, "Pronunciation": { "grade": 0.0, "comment": ( "extensive comment about pronunciation on the transcribed answer, use examples to justify the " "grade awarded." ) } } } @staticmethod async def save_file(item: GradeSpeakingItem) -> str: sound_file_name = "tmp/" + str(uuid.uuid4()) content = await item.answer.read() async with aiofiles.open(sound_file_name, 'wb') as f: await f.write(content) return sound_file_name