Files
encoach_backend/ielts_be/services/impl/exam/speaking/grade.py
2024-12-21 19:27:14 +00:00

342 lines
14 KiB
Python

import asyncio
import os
import uuid
from logging import getLogger
from typing import Dict, List
import aiofiles
from ielts_be.configs.constants import GPTModels, TemperatureSettings, FilePaths
from ielts_be.dtos.speaking import GradeSpeakingItem
from ielts_be.helpers import TextHelper
from ielts_be.repositories import IFileStorage
from ielts_be.services import ILLMService, ISpeechToTextService
class GradeSpeaking:
def __init__(self, llm: ILLMService, file_storage: IFileStorage, stt: ISpeechToTextService):
self._llm = llm
self._file_storage = file_storage
self._stt = stt
self._logger = getLogger(__name__)
async def grade_speaking_task(self, task: int, items: List[GradeSpeakingItem]) -> Dict:
request_id = str(uuid.uuid4())
self._log(task, request_id, f"Received request to grade speaking task {task}.")
if task != 2:
self._log(task, request_id, f'Received {len(items)} total answers.')
temp_files = []
try:
# Save all files first
temp_files = await asyncio.gather(*[
self.save_file(item) for item in items
])
# Process all transcriptions concurrently (up to 4)
self._log(task, request_id, 'Starting batch transcription')
text_transcription_segments = await asyncio.gather(*[
self._stt.speech_to_text(file_path)
for file_path in temp_files
], return_exceptions=True)
successful_transcriptions = []
failed_indices = []
successful_indices = []
for i, result in enumerate(text_transcription_segments):
if isinstance(result, Exception):
self._log(task, request_id, f'Transcription failed for exercise {i + 1}: {str(result)}')
failed_indices.append(i)
elif isinstance(result, list):
successful_transcriptions.append(result)
successful_indices.append(i)
text_answers = await asyncio.gather(*[
self._stt.fix_overlap(self._llm, answer_segments)
for answer_segments in successful_transcriptions
])
for answer in text_answers:
self._log(task, request_id, f'Transcribed answer: {answer}')
if not TextHelper.has_x_words(answer, 20):
self._log(
task, request_id,
f'The answer had less words than threshold 20 to be graded. Answer: {answer}'
)
return self._zero_rating("The audio recorded does not contain enough english words to be graded.")
# Get perfect answers
self._log(task, request_id, 'Requesting perfect answers')
perfect_answers = await asyncio.gather(*[
self._get_perfect_answer(task, item.question)
for item in items
])
# Format the responses
if task in {1, 3}:
self._log(task, request_id, 'Formatting answers and questions for prompt.')
formatted_text = ""
for success_idx, orig_idx in enumerate(successful_indices):
formatted_text += f"**Question {orig_idx + 1}:**\n{items[orig_idx].question}\n\n"
formatted_text += f"**Answer {orig_idx + 1}:**\n{text_answers[success_idx]}\n\n"
self._log(task, request_id, f'Formatted answers and questions for prompt: {formatted_text}')
questions_and_answers = f'\n\n The questions and answers are: \n\n{formatted_text}'
else:
if len(text_answers) > 0:
questions_and_answers = f'\n Question: "{items[0].question}" \n Answer: "{text_answers[0]}"'
else:
return self._zero_rating("The audio recording failed to be transcribed.")
self._log(task, request_id, 'Requesting grading of the answer(s).')
response = await self._grade_task(task, questions_and_answers)
self._log(task, request_id, f'Answer(s) graded: {response}')
if task in {1, 3}:
self._log(task, request_id, 'Adding perfect answer(s) to response.')
# Add responses for successful transcriptions
for success_idx, orig_idx in enumerate(successful_indices):
response['perfect_answer_' + str(orig_idx + 1)] = perfect_answers[
orig_idx] # Changed from success_idx
response['transcript_' + str(orig_idx + 1)] = text_answers[success_idx]
response['fixed_text_' + str(orig_idx + 1)] = await self._get_speaking_corrections(
text_answers[success_idx])
# Add empty strings for failed transcriptions but keep perfect answers
for failed_idx in failed_indices:
response['perfect_answer_' + str(failed_idx + 1)] = perfect_answers[
failed_idx] # Keep perfect answer
response['transcript_' + str(failed_idx + 1)] = ""
response['fixed_text_' + str(failed_idx + 1)] = ""
response[f'error_{failed_idx + 1}'] = f"Transcription failed for exercise {failed_idx + 1}"
else:
response['transcript'] = text_answers[0] if text_answers else ""
response['fixed_text'] = await self._get_speaking_corrections(text_answers[0]) if text_answers else ""
response['perfect_answer'] = perfect_answers[0]["answer"] if perfect_answers else ""
solutions = []
for i, file_name in enumerate(temp_files):
try:
if i not in failed_indices:
path = f'{FilePaths.FIREBASE_SPEAKING_VIDEO_FILES_PATH}{uuid.uuid4()}.wav'
else:
path = f'{FilePaths.FIREBASE_FAILED_TRANSCRIPTION_FILES_PATH}_grading_{request_id}_ex_{i + 1}.wav'
solution_url = await self._file_storage.upload_file_firebase_get_url(path, file_name)
solutions.append(solution_url)
except Exception as e:
self._log(task, request_id, f'Failed to upload file {i + 1}: {str(e)}')
solutions.append("")
response["overall"] = self._fix_speaking_overall(response["overall"], response["task_response"])
response["solutions"] = solutions
if task in {1, 3}:
response["answer"] = solutions
else:
response["fullPath"] = solutions[0]
self._log(task, request_id, f'Final response: {response}')
return response
finally:
for file_path in temp_files:
try:
if os.path.exists(file_path):
os.remove(file_path)
except Exception as e:
self._log(task, request_id, f'Error cleaning up temp file {file_path}: {str(e)}')
def _log(self, task: int, request_id: str, message: str):
self._logger.info(f'POST - speaking_task_{task} - {request_id} - {message}')
async def _get_perfect_answer(self, task: int, question: str):
messages = [
{
"role": "system",
"content": (
'You are a helpful assistant designed to output JSON on this format: {"answer": "perfect answer"}'
)
},
{
"role": "user",
"content": (
'Provide a perfect answer according to ielts grading system to the following '
f'Speaking Part {task} question: "{question}"'
)
}
]
if task == 1:
messages.append({
"role": "user",
"content": 'The answer must be 2 or 3 sentences long.'
})
gpt_model = GPTModels.GPT_4_O if task == 1 else GPTModels.GPT_3_5_TURBO
return await self._llm.prediction(
gpt_model, messages, ["answer"], TemperatureSettings.GRADING_TEMPERATURE
)
async def _grade_task(self, task: int, questions_and_answers: str) -> Dict:
messages = [
{
"role": "system",
"content": (
f'You are a helpful assistant designed to output JSON on this format: {self._grade_template()}'
)
},
{
"role": "user",
"content": (
f'Evaluate the given Speaking Part {task} response based on the IELTS grading system, ensuring a '
'strict assessment that penalizes errors. Deduct points for deviations from the task, and '
'assign a score of 0 if the response fails to address the question. Additionally, provide '
'detailed commentary highlighting both strengths and weaknesses in the response.'
) + questions_and_answers
}
]
task_specific = {
"1": (
'Address the student as "you". If the answers are not 2 or 3 sentences long, warn the '
'student that they should be.'
),
"2": 'Address the student as "you"',
"3": 'Address the student as "you" and pay special attention to coherence between the answers.'
}
messages.append({
"role": "user",
"content": task_specific[str(task)]
})
if task in {1, 3}:
messages.extend([
{
"role": "user",
"content": (
'For pronunciations act as if you heard the answers and they were transcribed '
'as you heard them.'
)
},
{
"role": "user",
"content": 'The comments must be long, detailed, justify the grading and suggest improvements.'
}
])
return await self._llm.prediction(
GPTModels.GPT_4_O, messages, ["comment"], TemperatureSettings.GRADING_TEMPERATURE
)
@staticmethod
def _fix_speaking_overall(overall: float, task_response: dict):
grades = [category["grade"] for category in task_response.values()]
if overall > max(grades) or overall < min(grades):
total_sum = sum(grades)
average = total_sum / len(grades)
rounded_average = round(average, 0)
return rounded_average
return overall
@staticmethod
def _zero_rating(comment: str):
return {
"comment": comment,
"overall": 0,
"task_response": {
"Fluency and Coherence": {
"grade": 0.0,
"comment": ""
},
"Lexical Resource": {
"grade": 0.0,
"comment": ""
},
"Grammatical Range and Accuracy": {
"grade": 0.0,
"comment": ""
},
"Pronunciation": {
"grade": 0.0,
"comment": ""
}
}
}
async def _get_speaking_corrections(self, text):
messages = [
{
"role": "system",
"content": (
'You are a helpful assistant designed to output JSON on this format: '
'{"fixed_text": "fixed transcription with no misspelling errors"}'
)
},
{
"role": "user",
"content": (
'Fix the errors in the provided transcription and put it in a JSON. '
f'Do not complete the answer, only replace what is wrong. \n The text: "{text}"'
)
}
]
response = await self._llm.prediction(
GPTModels.GPT_3_5_TURBO,
messages,
["fixed_text"],
0.2,
False
)
return response["fixed_text"]
@staticmethod
def _grade_template():
return {
"comment": "extensive comment about answer quality",
"overall": 0.0,
"task_response": {
"Fluency and Coherence": {
"grade": 0.0,
"comment": (
"extensive comment about fluency and coherence, use examples to justify the grade awarded."
)
},
"Lexical Resource": {
"grade": 0.0,
"comment": "extensive comment about lexical resource, use examples to justify the grade awarded."
},
"Grammatical Range and Accuracy": {
"grade": 0.0,
"comment": (
"extensive comment about grammatical range and accuracy, use examples to justify the "
"grade awarded."
)
},
"Pronunciation": {
"grade": 0.0,
"comment": (
"extensive comment about pronunciation on the transcribed answer, use examples to justify the "
"grade awarded."
)
}
}
}
@staticmethod
async def save_file(item: GradeSpeakingItem) -> str:
sound_file_name = "tmp/" + str(uuid.uuid4())
content = await item.answer.read()
async with aiofiles.open(sound_file_name, 'wb') as f:
await f.write(content)
return sound_file_name