456 lines
18 KiB
Python
456 lines
18 KiB
Python
import asyncio
|
|
import logging
|
|
import os
|
|
import aiofiles
|
|
import re
|
|
import uuid
|
|
from typing import Dict, List, Optional
|
|
|
|
from app.configs.constants import (
|
|
FieldsAndExercises, GPTModels, TemperatureSettings,
|
|
FilePaths
|
|
)
|
|
from app.dtos.speaking import GradeSpeakingItem
|
|
from app.helpers import TextHelper
|
|
from app.repositories.abc import IFileStorage, IDocumentStore
|
|
from app.services.abc import ISpeakingService, ILLMService, IVideoGeneratorService, ISpeechToTextService
|
|
|
|
|
|
class SpeakingService(ISpeakingService):
|
|
|
|
def __init__(
|
|
self, llm: ILLMService,
|
|
file_storage: IFileStorage,
|
|
stt: ISpeechToTextService
|
|
):
|
|
self._llm = llm
|
|
self._file_storage = file_storage
|
|
self._stt = stt
|
|
self._logger = logging.getLogger(__name__)
|
|
|
|
# TODO: Is the difficulty in the prompts supposed to be hardcoded? The response is set with
|
|
# either the difficulty in the request or a random one yet the prompt doesn't change
|
|
self._tasks = {
|
|
"task_1": {
|
|
"get": {
|
|
"json_template": {
|
|
"first_topic": "topic 1",
|
|
"second_topic": "topic 2",
|
|
"questions": [
|
|
(
|
|
"Introductory question about the first topic, starting the topic with "
|
|
"'Let's talk about x' and then the question."
|
|
),
|
|
"Follow up question about the first topic",
|
|
"Follow up question about the first topic",
|
|
"Question about second topic",
|
|
"Follow up question about the second topic",
|
|
]
|
|
},
|
|
"prompt": (
|
|
'Craft 5 simple and single questions of easy difficulty for IELTS Speaking Part 1 '
|
|
'that encourages candidates to delve deeply into personal experiences, preferences, or '
|
|
'insights on the topic of "{first_topic}" and the topic of "{second_topic}". '
|
|
'Make sure that the generated question does not contain forbidden subjects in '
|
|
'muslim countries.'
|
|
)
|
|
}
|
|
},
|
|
"task_2": {
|
|
"get": {
|
|
"json_template": {
|
|
"topic": "topic",
|
|
"question": "question",
|
|
"prompts": [
|
|
"prompt_1",
|
|
"prompt_2",
|
|
"prompt_3"
|
|
],
|
|
"suffix": "And explain why..."
|
|
},
|
|
"prompt": (
|
|
'Create a question of medium difficulty for IELTS Speaking Part 2 '
|
|
'that encourages candidates to narrate a personal experience or story related to the topic '
|
|
'of "{topic}". Include 3 prompts that guide the candidate to describe '
|
|
'specific aspects of the experience, such as details about the situation, '
|
|
'their actions, and the reasons it left a lasting impression. Make sure that the '
|
|
'generated question does not contain forbidden subjects in muslim countries.'
|
|
)
|
|
}
|
|
},
|
|
"task_3": {
|
|
"get": {
|
|
"json_template": {
|
|
"topic": "topic",
|
|
"questions": [
|
|
"Introductory question about the topic.",
|
|
"Follow up question about the topic",
|
|
"Follow up question about the topic",
|
|
"Follow up question about the topic",
|
|
"Follow up question about the topic"
|
|
]
|
|
},
|
|
"prompt": (
|
|
'Formulate a set of 5 single questions of hard difficulty for IELTS Speaking Part 3'
|
|
'that encourage candidates to engage in a meaningful discussion on the topic of "{topic}". '
|
|
'Provide inquiries, ensuring they explore various aspects, perspectives, and implications '
|
|
'related to the topic. Make sure that the generated question does not contain forbidden '
|
|
'subjects in muslim countries.'
|
|
)
|
|
}
|
|
},
|
|
}
|
|
|
|
async def get_speaking_part(
|
|
self, part: int, topic: str, second_topic: str, difficulty: str
|
|
) -> Dict:
|
|
task_values = self._tasks[f'task_{part}']['get']
|
|
|
|
if part == 1:
|
|
task_prompt = task_values["prompt"].format(first_topic=topic, second_topic=second_topic)
|
|
else:
|
|
task_prompt = task_values["prompt"].format(topic=topic)
|
|
|
|
messages = [
|
|
{
|
|
"role": "system",
|
|
"content": (
|
|
'You are a helpful assistant designed to output JSON on this format: '
|
|
f'{task_values["json_template"]}'
|
|
)
|
|
},
|
|
{
|
|
"role": "user",
|
|
"content": task_prompt
|
|
}
|
|
]
|
|
|
|
part_specific = {
|
|
"1": 'The questions should lead to the usage of 4 verb tenses (present perfect, present, past and future).',
|
|
"2": (
|
|
'The prompts must not be questions. Also include a suffix like the ones in the IELTS exams '
|
|
'that start with "And explain why".'
|
|
)
|
|
}
|
|
|
|
if part in {1, 2}:
|
|
messages.append({
|
|
"role": "user",
|
|
"content": part_specific[str(part)]
|
|
})
|
|
|
|
if part in {1, 3}:
|
|
messages.append({
|
|
"role": "user",
|
|
"content": 'They must be 1 single question each and not be double-barreled questions.'
|
|
})
|
|
|
|
fields_to_check = ["first_topic"] if part == 1 else FieldsAndExercises.GEN_FIELDS
|
|
|
|
response = await self._llm.prediction(
|
|
GPTModels.GPT_4_O, messages, fields_to_check, TemperatureSettings.GEN_QUESTION_TEMPERATURE
|
|
)
|
|
|
|
if part == 3:
|
|
# Remove the numbers from the questions only if the string starts with a number
|
|
response["questions"] = [
|
|
re.sub(r"^\d+\.\s*", "", question)
|
|
if re.match(r"^\d+\.", question) else question
|
|
for question in response["questions"]
|
|
]
|
|
|
|
response["type"] = part
|
|
response["difficulty"] = difficulty
|
|
|
|
if part in {2, 3}:
|
|
response["topic"] = topic
|
|
|
|
return response
|
|
|
|
async def grade_speaking_task(self, task: int, items: List[GradeSpeakingItem]) -> Dict:
|
|
request_id = str(uuid.uuid4())
|
|
self._log(task, request_id, f"Received request to grade speaking task {task}.")
|
|
|
|
if task != 2:
|
|
self._log(task, request_id, f'Received {len(items)} total answers.')
|
|
|
|
temp_files = []
|
|
try:
|
|
# Save all files first
|
|
temp_files = await asyncio.gather(*[
|
|
self.save_file(item) for item in items
|
|
])
|
|
|
|
# Process all transcriptions concurrently (up to 4)
|
|
self._log(task, request_id, 'Starting batch transcription')
|
|
text_answers = await asyncio.gather(*[
|
|
self._stt.speech_to_text(file_path)
|
|
for file_path in temp_files
|
|
])
|
|
|
|
for answer in text_answers:
|
|
self._log(task, request_id, f'Transcribed answer: {answer}')
|
|
if not TextHelper.has_x_words(answer, 20):
|
|
self._log(
|
|
task, request_id,
|
|
f'The answer had less words than threshold 20 to be graded. Answer: {answer}'
|
|
)
|
|
return self._zero_rating("The audio recorded does not contain enough english words to be graded.")
|
|
|
|
# Get perfect answers
|
|
self._log(task, request_id, 'Requesting perfect answers')
|
|
perfect_answers = await asyncio.gather(*[
|
|
self._get_perfect_answer(task, item.question)
|
|
for item in items
|
|
])
|
|
|
|
# Format the responses
|
|
if task in {1, 3}:
|
|
self._log(task, request_id, 'Formatting answers and questions for prompt.')
|
|
|
|
formatted_text = ""
|
|
for i, (item, transcribed_answer) in enumerate(zip(items, text_answers), start=1):
|
|
formatted_text += f"**Question {i}:**\n{item.question}\n\n"
|
|
formatted_text += f"**Answer {i}:**\n{transcribed_answer}\n\n"
|
|
|
|
self._log(task, request_id, f'Formatted answers and questions for prompt: {formatted_text}')
|
|
questions_and_answers = f'\n\n The questions and answers are: \n\n{formatted_text}'
|
|
else:
|
|
questions_and_answers = f'\n Question: "{items[0].question}" \n Answer: "{text_answers[0]}"'
|
|
|
|
self._log(task, request_id, 'Requesting grading of the answer(s).')
|
|
response = await self._grade_task(task, questions_and_answers)
|
|
self._log(task, request_id, f'Answer(s) graded: {response}')
|
|
|
|
if task in {1, 3}:
|
|
self._log(task, request_id, 'Adding perfect answer(s) to response.')
|
|
|
|
# TODO: check if it is answer["answer"] instead
|
|
for i, answer in enumerate(perfect_answers, start=1):
|
|
response['perfect_answer_' + str(i)] = answer
|
|
|
|
self._log(task, request_id, 'Getting speaking corrections in parallel')
|
|
# Get all corrections in parallel
|
|
fixed_texts = await asyncio.gather(*[
|
|
self._get_speaking_corrections(answer)
|
|
for answer in text_answers
|
|
])
|
|
|
|
self._log(task, request_id, 'Adding transcript and fixed texts to response.')
|
|
for i, (answer, fixed) in enumerate(zip(text_answers, fixed_texts), start=1):
|
|
response['transcript_' + str(i)] = answer
|
|
response['fixed_text_' + str(i)] = fixed
|
|
else:
|
|
response['transcript'] = text_answers[0]
|
|
|
|
self._log(task, request_id, 'Requesting fixed text.')
|
|
response['fixed_text'] = await self._get_speaking_corrections(text_answers[0])
|
|
self._log(task, request_id, f'Fixed text: {response["fixed_text"]}')
|
|
|
|
response['perfect_answer'] = perfect_answers[0]["answer"]
|
|
|
|
response["overall"] = self._fix_speaking_overall(response["overall"], response["task_response"])
|
|
self._log(task, request_id, f'Final response: {response}')
|
|
return response
|
|
|
|
finally:
|
|
for file_path in temp_files:
|
|
try:
|
|
if os.path.exists(file_path):
|
|
os.remove(file_path)
|
|
except Exception as e:
|
|
self._log(task, request_id, f'Error cleaning up temp file {file_path}: {str(e)}')
|
|
|
|
def _log(self, task: int, request_id: str, message: str):
|
|
self._logger.info(f'POST - speaking_task_{task} - {request_id} - {message}')
|
|
|
|
@staticmethod
|
|
async def save_file(item: GradeSpeakingItem) -> str:
|
|
sound_file_name = FilePaths.AUDIO_FILES_PATH + str(uuid.uuid4())
|
|
content = await item.answer.read()
|
|
async with aiofiles.open(sound_file_name, 'wb') as f:
|
|
await f.write(content)
|
|
return sound_file_name
|
|
|
|
# ==================================================================================================================
|
|
# grade_speaking_task helpers
|
|
# ==================================================================================================================
|
|
|
|
async def _get_perfect_answer(self, task: int, question: str):
|
|
messages = [
|
|
{
|
|
"role": "system",
|
|
"content": (
|
|
'You are a helpful assistant designed to output JSON on this format: {"answer": "perfect answer"}'
|
|
)
|
|
},
|
|
{
|
|
"role": "user",
|
|
"content": (
|
|
'Provide a perfect answer according to ielts grading system to the following '
|
|
f'Speaking Part {task} question: "{question}"'
|
|
)
|
|
}
|
|
]
|
|
|
|
if task == 1:
|
|
messages.append({
|
|
"role": "user",
|
|
"content": 'The answer must be 2 or 3 sentences long.'
|
|
})
|
|
|
|
gpt_model = GPTModels.GPT_4_O if task == 1 else GPTModels.GPT_3_5_TURBO
|
|
|
|
return await self._llm.prediction(
|
|
gpt_model, messages, ["answer"], TemperatureSettings.GRADING_TEMPERATURE
|
|
)
|
|
|
|
async def _grade_task(self, task: int, questions_and_answers: str) -> Dict:
|
|
messages = [
|
|
{
|
|
"role": "system",
|
|
"content": (
|
|
f'You are a helpful assistant designed to output JSON on this format: {self._grade_template()}'
|
|
)
|
|
},
|
|
{
|
|
"role": "user",
|
|
"content": (
|
|
f'Evaluate the given Speaking Part {task} response based on the IELTS grading system, ensuring a '
|
|
'strict assessment that penalizes errors. Deduct points for deviations from the task, and '
|
|
'assign a score of 0 if the response fails to address the question. Additionally, provide '
|
|
'detailed commentary highlighting both strengths and weaknesses in the response.'
|
|
) + questions_and_answers
|
|
}
|
|
]
|
|
|
|
task_specific = {
|
|
"1": (
|
|
'Address the student as "you". If the answers are not 2 or 3 sentences long, warn the '
|
|
'student that they should be.'
|
|
),
|
|
"2": 'Address the student as "you"',
|
|
"3": 'Address the student as "you" and pay special attention to coherence between the answers.'
|
|
}
|
|
|
|
messages.append({
|
|
"role": "user",
|
|
"content": task_specific[str(task)]
|
|
})
|
|
|
|
if task in {1, 3}:
|
|
messages.extend([
|
|
{
|
|
"role": "user",
|
|
"content": (
|
|
'For pronunciations act as if you heard the answers and they were transcribed '
|
|
'as you heard them.'
|
|
)
|
|
},
|
|
{
|
|
"role": "user",
|
|
"content": 'The comments must be long, detailed, justify the grading and suggest improvements.'
|
|
}
|
|
])
|
|
|
|
return await self._llm.prediction(
|
|
GPTModels.GPT_4_O, messages, ["comment"], TemperatureSettings.GRADING_TEMPERATURE
|
|
)
|
|
|
|
@staticmethod
|
|
def _fix_speaking_overall(overall: float, task_response: dict):
|
|
grades = [category["grade"] for category in task_response.values()]
|
|
|
|
if overall > max(grades) or overall < min(grades):
|
|
total_sum = sum(grades)
|
|
average = total_sum / len(grades)
|
|
rounded_average = round(average, 0)
|
|
return rounded_average
|
|
|
|
return overall
|
|
|
|
@staticmethod
|
|
def _zero_rating(comment: str):
|
|
return {
|
|
"comment": comment,
|
|
"overall": 0,
|
|
"task_response": {
|
|
"Fluency and Coherence": {
|
|
"grade": 0.0,
|
|
"comment": ""
|
|
},
|
|
"Lexical Resource": {
|
|
"grade": 0.0,
|
|
"comment": ""
|
|
},
|
|
"Grammatical Range and Accuracy": {
|
|
"grade": 0.0,
|
|
"comment": ""
|
|
},
|
|
"Pronunciation": {
|
|
"grade": 0.0,
|
|
"comment": ""
|
|
}
|
|
}
|
|
}
|
|
|
|
async def _get_speaking_corrections(self, text):
|
|
messages = [
|
|
{
|
|
"role": "system",
|
|
"content": (
|
|
'You are a helpful assistant designed to output JSON on this format: '
|
|
'{"fixed_text": "fixed transcription with no misspelling errors"}'
|
|
)
|
|
},
|
|
{
|
|
"role": "user",
|
|
"content": (
|
|
'Fix the errors in the provided transcription and put it in a JSON. '
|
|
f'Do not complete the answer, only replace what is wrong. \n The text: "{text}"'
|
|
)
|
|
}
|
|
]
|
|
|
|
response = await self._llm.prediction(
|
|
GPTModels.GPT_3_5_TURBO,
|
|
messages,
|
|
["fixed_text"],
|
|
0.2,
|
|
False
|
|
)
|
|
return response["fixed_text"]
|
|
|
|
|
|
@staticmethod
|
|
def _grade_template():
|
|
return {
|
|
"comment": "extensive comment about answer quality",
|
|
"overall": 0.0,
|
|
"task_response": {
|
|
"Fluency and Coherence": {
|
|
"grade": 0.0,
|
|
"comment": (
|
|
"extensive comment about fluency and coherence, use examples to justify the grade awarded."
|
|
)
|
|
},
|
|
"Lexical Resource": {
|
|
"grade": 0.0,
|
|
"comment": "extensive comment about lexical resource, use examples to justify the grade awarded."
|
|
},
|
|
"Grammatical Range and Accuracy": {
|
|
"grade": 0.0,
|
|
"comment": (
|
|
"extensive comment about grammatical range and accuracy, use examples to justify the "
|
|
"grade awarded."
|
|
)
|
|
},
|
|
"Pronunciation": {
|
|
"grade": 0.0,
|
|
"comment": (
|
|
"extensive comment about pronunciation on the transcribed answer, use examples to justify the "
|
|
"grade awarded."
|
|
)
|
|
}
|
|
}
|
|
} |