Files
encoach_backend/ielts_be/services/impl/exam/speaking/grade.py

317 lines
12 KiB
Python

import asyncio
import os
import uuid
from logging import getLogger
from typing import Dict, List
import aiofiles
from ielts_be.configs.constants import GPTModels, TemperatureSettings, FilePaths
from ielts_be.dtos.speaking import GradeSpeakingItem
from ielts_be.helpers import TextHelper
from ielts_be.repositories import IFileStorage
from ielts_be.services import ILLMService, ISpeechToTextService
class GradeSpeaking:
def __init__(self, llm: ILLMService, file_storage: IFileStorage, stt: ISpeechToTextService):
self._llm = llm
self._file_storage = file_storage
self._stt = stt
self._logger = getLogger(__name__)
async def grade_speaking_task(self, task: int, items: List[GradeSpeakingItem]) -> Dict:
request_id = str(uuid.uuid4())
self._log(task, request_id, f"Received request to grade speaking task {task}.")
if task != 2:
self._log(task, request_id, f'Received {len(items)} total answers.')
temp_files = []
try:
# Save all files first
temp_files = await asyncio.gather(*[
self.save_file(item) for item in items
])
# Process all transcriptions concurrently (up to 4)
self._log(task, request_id, 'Starting batch transcription')
text_answers = await asyncio.gather(*[
self._stt.speech_to_text(file_path)
for file_path in temp_files
])
for answer in text_answers:
self._log(task, request_id, f'Transcribed answer: {answer}')
if not TextHelper.has_x_words(answer, 20):
self._log(
task, request_id,
f'The answer had less words than threshold 20 to be graded. Answer: {answer}'
)
return self._zero_rating("The audio recorded does not contain enough english words to be graded.")
# Get perfect answers
self._log(task, request_id, 'Requesting perfect answers')
perfect_answers = await asyncio.gather(*[
self._get_perfect_answer(task, item.question)
for item in items
])
# Format the responses
if task in {1, 3}:
self._log(task, request_id, 'Formatting answers and questions for prompt.')
formatted_text = ""
for i, (item, transcribed_answer) in enumerate(zip(items, text_answers), start=1):
formatted_text += f"**Question {i}:**\n{item.question}\n\n"
formatted_text += f"**Answer {i}:**\n{transcribed_answer}\n\n"
self._log(task, request_id, f'Formatted answers and questions for prompt: {formatted_text}')
questions_and_answers = f'\n\n The questions and answers are: \n\n{formatted_text}'
else:
questions_and_answers = f'\n Question: "{items[0].question}" \n Answer: "{text_answers[0]}"'
self._log(task, request_id, 'Requesting grading of the answer(s).')
response = await self._grade_task(task, questions_and_answers)
self._log(task, request_id, f'Answer(s) graded: {response}')
if task in {1, 3}:
self._log(task, request_id, 'Adding perfect answer(s) to response.')
# TODO: check if it is answer["answer"] instead
for i, answer in enumerate(perfect_answers, start=1):
response['perfect_answer_' + str(i)] = answer
self._log(task, request_id, 'Getting speaking corrections in parallel')
# Get all corrections in parallel
fixed_texts = await asyncio.gather(*[
self._get_speaking_corrections(answer)
for answer in text_answers
])
self._log(task, request_id, 'Adding transcript and fixed texts to response.')
for i, (answer, fixed) in enumerate(zip(text_answers, fixed_texts), start=1):
response['transcript_' + str(i)] = answer
response['fixed_text_' + str(i)] = fixed
else:
response['transcript'] = text_answers[0]
self._log(task, request_id, 'Requesting fixed text.')
response['fixed_text'] = await self._get_speaking_corrections(text_answers[0])
self._log(task, request_id, f'Fixed text: {response["fixed_text"]}')
response['perfect_answer'] = perfect_answers[0]["answer"]
solutions = []
for file_name in temp_files:
solutions.append(await self._file_storage.upload_file_firebase_get_url(f'{FilePaths.FIREBASE_SPEAKING_VIDEO_FILES_PATH}{uuid.uuid4()}.wav', file_name))
response["overall"] = self._fix_speaking_overall(response["overall"], response["task_response"])
response["solutions"] = solutions
if task in {1,3}:
response["answer"] = solutions
else:
response["fullPath"] = solutions[0]
self._log(task, request_id, f'Final response: {response}')
return response
finally:
for file_path in temp_files:
try:
if os.path.exists(file_path):
os.remove(file_path)
except Exception as e:
self._log(task, request_id, f'Error cleaning up temp file {file_path}: {str(e)}')
def _log(self, task: int, request_id: str, message: str):
self._logger.info(f'POST - speaking_task_{task} - {request_id} - {message}')
async def _get_perfect_answer(self, task: int, question: str):
messages = [
{
"role": "system",
"content": (
'You are a helpful assistant designed to output JSON on this format: {"answer": "perfect answer"}'
)
},
{
"role": "user",
"content": (
'Provide a perfect answer according to ielts grading system to the following '
f'Speaking Part {task} question: "{question}"'
)
}
]
if task == 1:
messages.append({
"role": "user",
"content": 'The answer must be 2 or 3 sentences long.'
})
gpt_model = GPTModels.GPT_4_O if task == 1 else GPTModels.GPT_3_5_TURBO
return await self._llm.prediction(
gpt_model, messages, ["answer"], TemperatureSettings.GRADING_TEMPERATURE
)
async def _grade_task(self, task: int, questions_and_answers: str) -> Dict:
messages = [
{
"role": "system",
"content": (
f'You are a helpful assistant designed to output JSON on this format: {self._grade_template()}'
)
},
{
"role": "user",
"content": (
f'Evaluate the given Speaking Part {task} response based on the IELTS grading system, ensuring a '
'strict assessment that penalizes errors. Deduct points for deviations from the task, and '
'assign a score of 0 if the response fails to address the question. Additionally, provide '
'detailed commentary highlighting both strengths and weaknesses in the response.'
) + questions_and_answers
}
]
task_specific = {
"1": (
'Address the student as "you". If the answers are not 2 or 3 sentences long, warn the '
'student that they should be.'
),
"2": 'Address the student as "you"',
"3": 'Address the student as "you" and pay special attention to coherence between the answers.'
}
messages.append({
"role": "user",
"content": task_specific[str(task)]
})
if task in {1, 3}:
messages.extend([
{
"role": "user",
"content": (
'For pronunciations act as if you heard the answers and they were transcribed '
'as you heard them.'
)
},
{
"role": "user",
"content": 'The comments must be long, detailed, justify the grading and suggest improvements.'
}
])
return await self._llm.prediction(
GPTModels.GPT_4_O, messages, ["comment"], TemperatureSettings.GRADING_TEMPERATURE
)
@staticmethod
def _fix_speaking_overall(overall: float, task_response: dict):
grades = [category["grade"] for category in task_response.values()]
if overall > max(grades) or overall < min(grades):
total_sum = sum(grades)
average = total_sum / len(grades)
rounded_average = round(average, 0)
return rounded_average
return overall
@staticmethod
def _zero_rating(comment: str):
return {
"comment": comment,
"overall": 0,
"task_response": {
"Fluency and Coherence": {
"grade": 0.0,
"comment": ""
},
"Lexical Resource": {
"grade": 0.0,
"comment": ""
},
"Grammatical Range and Accuracy": {
"grade": 0.0,
"comment": ""
},
"Pronunciation": {
"grade": 0.0,
"comment": ""
}
}
}
async def _get_speaking_corrections(self, text):
messages = [
{
"role": "system",
"content": (
'You are a helpful assistant designed to output JSON on this format: '
'{"fixed_text": "fixed transcription with no misspelling errors"}'
)
},
{
"role": "user",
"content": (
'Fix the errors in the provided transcription and put it in a JSON. '
f'Do not complete the answer, only replace what is wrong. \n The text: "{text}"'
)
}
]
response = await self._llm.prediction(
GPTModels.GPT_3_5_TURBO,
messages,
["fixed_text"],
0.2,
False
)
return response["fixed_text"]
@staticmethod
def _grade_template():
return {
"comment": "extensive comment about answer quality",
"overall": 0.0,
"task_response": {
"Fluency and Coherence": {
"grade": 0.0,
"comment": (
"extensive comment about fluency and coherence, use examples to justify the grade awarded."
)
},
"Lexical Resource": {
"grade": 0.0,
"comment": "extensive comment about lexical resource, use examples to justify the grade awarded."
},
"Grammatical Range and Accuracy": {
"grade": 0.0,
"comment": (
"extensive comment about grammatical range and accuracy, use examples to justify the "
"grade awarded."
)
},
"Pronunciation": {
"grade": 0.0,
"comment": (
"extensive comment about pronunciation on the transcribed answer, use examples to justify the "
"grade awarded."
)
}
}
}
@staticmethod
async def save_file(item: GradeSpeakingItem) -> str:
sound_file_name = "tmp/" + str(uuid.uuid4())
content = await item.answer.read()
async with aiofiles.open(sound_file_name, 'wb') as f:
await f.write(content)
return sound_file_name