Async release

This commit is contained in:
Carlos Mesquita
2024-07-23 08:40:35 +01:00
parent a4caecdb4f
commit 3cf9fa5cba
116 changed files with 5609 additions and 30630 deletions

View File

@@ -0,0 +1,521 @@
import logging
import os
import re
import uuid
import random
from typing import Dict, List
from app.repositories.abc import IFileStorage, IDocumentStore
from app.services.abc import ISpeakingService, ILLMService, IVideoGeneratorService, ISpeechToTextService
from app.configs.constants import (
FieldsAndExercises, GPTModels, TemperatureSettings,
AvatarEnum, FilePaths
)
from app.helpers import TextHelper
class SpeakingService(ISpeakingService):
def __init__(
self, llm: ILLMService, vid_gen: IVideoGeneratorService,
file_storage: IFileStorage, document_store: IDocumentStore,
stt: ISpeechToTextService
):
self._llm = llm
self._vid_gen = vid_gen
self._file_storage = file_storage
self._document_store = document_store
self._stt = stt
self._logger = logging.getLogger(__name__)
self._tasks = {
"task_1": {
"get": {
"json_template": (
'{"topic": "topic", "question": "question"}'
),
"prompt": (
'Craft a thought-provoking question of {difficulty} difficulty for IELTS Speaking Part 1 '
'that encourages candidates to delve deeply into personal experiences, preferences, or '
'insights on the topic of "{topic}". Instruct the candidate to offer not only detailed '
'descriptions but also provide nuanced explanations, examples, or anecdotes to enrich '
'their response. Make sure that the generated question does not contain forbidden subjects in '
'muslim countries.'
)
}
},
"task_2": {
"get": {
"json_template": (
'{"topic": "topic", "question": "question", "prompts": ["prompt_1", "prompt_2", "prompt_3"]}'
),
"prompt": (
'Create a question of {difficulty} difficulty for IELTS Speaking Part 2 '
'that encourages candidates to narrate a personal experience or story related to the topic '
'of "{topic}". Include 3 prompts that guide the candidate to describe '
'specific aspects of the experience, such as details about the situation, '
'their actions, and the reasons it left a lasting impression. Make sure that the '
'generated question does not contain forbidden subjects in muslim countries.'
)
}
},
"task_3": {
"get": {
"json_template": (
'{"topic": "topic", "questions": ["question", "question", "question"]}'
),
"prompt": (
'Formulate a set of 3 questions of {difficulty} difficulty for IELTS Speaking Part 3 '
'that encourage candidates to engage in a meaningful discussion on the topic of "{topic}". '
'Provide inquiries, ensuring they explore various aspects, perspectives, and implications '
'related to the topic. Make sure that the generated question does not contain forbidden '
'subjects in muslim countries.'
)
}
},
}
async def get_speaking_task(self, task_id: int, topic: str, difficulty: str):
task_values = self._tasks[f'task_{task_id}']['get']
messages = [
{
"role": "system",
"content": (
'You are a helpful assistant designed to output JSON on this format: ' +
task_values["json_template"]
)
},
{
"role": "user",
"content": str(task_values["prompt"]).format(topic=topic, difficulty=difficulty)
}
]
response = await self._llm.prediction(
GPTModels.GPT_4_O, messages, FieldsAndExercises.GEN_FIELDS, TemperatureSettings.GEN_QUESTION_TEMPERATURE
)
# TODO: this was on GET /speaking_task_3 don't know if it is intentional only for 3
if task_id == 3:
# Remove the numbers from the questions only if the string starts with a number
response["questions"] = [
re.sub(r"^\d+\.\s*", "", question)
if re.match(r"^\d+\.", question) else question
for question in response["questions"]
]
response["type"] = task_id
response["difficulty"] = difficulty
response["topic"] = topic
return response
async def grade_speaking_task_1_and_2(
self, task: int, question: str, answer_firebase_path: str, sound_file_name: str
):
request_id = uuid.uuid4()
req_data = {
"question": question,
"answer": answer_firebase_path
}
self._logger.info(
f'POST - speaking_task_{task} - Received request to grade speaking task {task}. '
f'Use this id to track the logs: {str(request_id)} - Request data: {str(req_data)}'
)
self._logger.info(f'POST - speaking_task_{task} - {str(request_id)} - Downloading file {answer_firebase_path}')
await self._file_storage.download_firebase_file(answer_firebase_path, sound_file_name)
self._logger.info(f'POST - speaking_task_{task} - {str(request_id)} - Downloaded file {answer_firebase_path} to {sound_file_name}')
answer = await self._stt.speech_to_text(sound_file_name)
self._logger.info(f'POST - speaking_task_{task} - {str(request_id)} - Transcripted answer: {answer}')
if TextHelper.has_x_words(answer, 20):
messages = [
{
"role": "system",
"content": (
'You are a helpful assistant designed to output JSON on this format: '
'{"comment": "comment about answer quality", "overall": 0.0, '
'"task_response": {"Fluency and Coherence": 0.0, "Lexical Resource": 0.0, '
'"Grammatical Range and Accuracy": 0.0, "Pronunciation": 0.0}}')
},
{
"role": "user",
"content": (
f'Evaluate the given Speaking Part {task} response based on the IELTS grading system, ensuring a '
'strict assessment that penalizes errors. Deduct points for deviations from the task, and '
'assign a score of 0 if the response fails to address the question. Additionally, provide '
'detailed commentary highlighting both strengths and weaknesses in the response.'
f'\n Question: "{question}" \n Answer: "{answer}"')
}
]
self._logger.info(f'POST - speaking_task_{task} - {str(request_id)} - Requesting grading of the answer.')
response = await self._llm.prediction(
GPTModels.GPT_3_5_TURBO,
messages,
["comment"],
TemperatureSettings.GRADING_TEMPERATURE
)
self._logger.info(f'POST - speaking_task_{task} - {str(request_id)} - Answer graded: {str(response)}')
perfect_answer_messages = [
{
"role": "system",
"content": (
'You are a helpful assistant designed to output JSON on this format: '
'{"answer": "perfect answer"}'
)
},
{
"role": "user",
"content": (
'Provide a perfect answer according to ielts grading system to the following '
f'Speaking Part {task} question: "{question}"')
}
]
self._logger.info(f'POST - speaking_task_{task} - {str(request_id)} - Requesting perfect answer.')
response = await self._llm.prediction(
GPTModels.GPT_3_5_TURBO,
perfect_answer_messages,
["answer"],
TemperatureSettings.GEN_QUESTION_TEMPERATURE
)
response['perfect_answer'] = response["answer"]
self._logger.info(f'POST - speaking_task_{task} - {str(request_id)} - Perfect answer: ' + response['perfect_answer'])
response['transcript'] = answer
self._logger.info(f'POST - speaking_task_{task} - {str(request_id)} - Requesting fixed text.')
response['fixed_text'] = await self._get_speaking_corrections(answer)
self._logger.info(f'POST - speaking_task_{task} - {str(request_id)} - Fixed text: ' + response['fixed_text'])
if response["overall"] == "0.0" or response["overall"] == 0.0:
response["overall"] = self._calculate_overall(response)
self._logger.info(f'POST - speaking_task_{task} - {str(request_id)} - Final response: {str(response)}')
return response
else:
self._logger.info(
f'POST - speaking_task_{task} - {str(request_id)} - '
f'The answer had less words than threshold 20 to be graded. Answer: {answer}'
)
return self._zero_rating("The audio recorded does not contain enough english words to be graded.")
# TODO: When there's more time grade_speaking_task_1_2 can be merged with this, when there's more time
async def grade_speaking_task_3(self, answers: Dict, task: int = 3):
request_id = uuid.uuid4()
self._logger.info(
f'POST - speaking_task_{task} - Received request to grade speaking task {task}. '
f'Use this id to track the logs: {str(request_id)} - Request data: {str(answers)}'
)
text_answers = []
perfect_answers = []
self._logger.info(
f'POST - speaking_task_{task} - {str(request_id)} - Received {str(len(answers))} total answers.'
)
for item in answers:
sound_file_name = FilePaths.AUDIO_FILES_PATH + str(uuid.uuid4())
self._logger.info(f'POST - speaking_task_{task} - {str(request_id)} - Downloading file {item["answer"]}')
await self._file_storage.download_firebase_file(item["answer"], sound_file_name)
self._logger.info(
f'POST - speaking_task_{task} - {str(request_id)} - '
'Downloaded file ' + item["answer"] + f' to {sound_file_name}'
)
answer_text = await self._stt.speech_to_text(sound_file_name)
self._logger.info(f'POST - speaking_task_{task} - {str(request_id)} - Transcripted answer: {answer_text}')
text_answers.append(answer_text)
item["answer"] = answer_text
os.remove(sound_file_name)
if not TextHelper.has_x_words(answer_text, 20):
self._logger.info(
f'POST - speaking_task_{task} - {str(request_id)} - '
f'The answer had less words than threshold 20 to be graded. Answer: {answer_text}')
return self._zero_rating("The audio recorded does not contain enough english words to be graded.")
perfect_answer_messages = [
{
"role": "system",
"content": (
'You are a helpful assistant designed to output JSON on this format: '
'{"answer": "perfect answer"}'
)
},
{
"role": "user",
"content": (
'Provide a perfect answer according to ielts grading system to the following '
f'Speaking Part {task} question: "{item["question"]}"'
)
}
]
self._logger.info(
f'POST - speaking_task_{task} - {str(request_id)} - '
f'Requesting perfect answer for question: {item["question"]}'
)
perfect_answers.append(
await self._llm.prediction(
GPTModels.GPT_3_5_TURBO,
perfect_answer_messages,
["answer"],
TemperatureSettings.GEN_QUESTION_TEMPERATURE
)
)
messages = [
{
"role": "system",
"content": (
'You are a helpful assistant designed to output JSON on this format: '
'{"comment": "comment about answer quality", "overall": 0.0, '
'"task_response": {"Fluency and Coherence": 0.0, "Lexical Resource": 0.0, '
'"Grammatical Range and Accuracy": 0.0, "Pronunciation": 0.0}}')
}
]
message = (
f"Evaluate the given Speaking Part {task} response based on the IELTS grading system, ensuring a "
"strict assessment that penalizes errors. Deduct points for deviations from the task, and "
"assign a score of 0 if the response fails to address the question. Additionally, provide detailed "
"commentary highlighting both strengths and weaknesses in the response."
"\n\n The questions and answers are: \n\n'")
self._logger.info(
f'POST - speaking_task_{task} - {str(request_id)} - Formatting answers and questions for prompt.'
)
formatted_text = ""
for i, entry in enumerate(answers, start=1):
formatted_text += f"**Question {i}:**\n{entry['question']}\n\n"
formatted_text += f"**Answer {i}:**\n{entry['answer']}\n\n"
self._logger.info(
f'POST - speaking_task_{task} - {str(request_id)} - Formatted answers and questions for prompt: {formatted_text}'
)
message += formatted_text
messages.append({
"role": "user",
"content": message
})
self._logger.info(f'POST - speaking_task_{task} - {str(request_id)} - Requesting grading of the answers.')
response = await self._llm.prediction(
GPTModels.GPT_3_5_TURBO, messages, ["comment"], TemperatureSettings.GRADING_TEMPERATURE
)
self._logger.info(f'POST - speaking_task_{task} - {str(request_id)} - Answers graded: {str(response)}')
self._logger.info(f'POST - speaking_task_{task} - {str(request_id)} - Adding perfect answers to response.')
for i, answer in enumerate(perfect_answers, start=1):
response['perfect_answer_' + str(i)] = answer
self._logger.info(
f'POST - speaking_task_{task} - {str(request_id)} - Adding transcript and fixed texts to response.'
)
for i, answer in enumerate(text_answers, start=1):
response['transcript_' + str(i)] = answer
response['fixed_text_' + str(i)] = await self._get_speaking_corrections(answer)
if response["overall"] == "0.0" or response["overall"] == 0.0:
response["overall"] = self._calculate_overall(response)
self._logger.info(f'POST - speaking_task_{task} - {str(request_id)} - Final response: {str(response)}')
return response
# ==================================================================================================================
# grade_speaking_task helpers
# ==================================================================================================================
@staticmethod
def _zero_rating(comment: str):
return {
"comment": comment,
"overall": 0,
"task_response": {
"Fluency and Coherence": 0,
"Lexical Resource": 0,
"Grammatical Range and Accuracy": 0,
"Pronunciation": 0
}
}
@staticmethod
def _calculate_overall(response: Dict):
return round(
(
response["task_response"]["Fluency and Coherence"] +
response["task_response"]["Lexical Resource"] +
response["task_response"]["Grammatical Range and Accuracy"] +
response["task_response"]["Pronunciation"]
) / 4, 1
)
async def _get_speaking_corrections(self, text):
messages = [
{
"role": "system",
"content": (
'You are a helpful assistant designed to output JSON on this format: '
'{"fixed_text": "fixed transcription with no misspelling errors"}'
)
},
{
"role": "user",
"content": (
'Fix the errors in the provided transcription and put it in a JSON. '
f'Do not complete the answer, only replace what is wrong. \n The text: "{text}"'
)
}
]
response = await self._llm.prediction(
GPTModels.GPT_3_5_TURBO,
messages,
["fixed_text"],
0.2,
False
)
return response["fixed_text"]
async def create_videos_and_save_to_db(self, exercises, template, req_id):
template = await self._create_video_per_part(exercises, template, 1)
template = await self._create_video_per_part(exercises, template, 2)
template = await self._create_video_per_part(exercises, template, 3)
await self._document_store.save_to_db_with_id("speaking", template, req_id)
self._logger.info(f'Saved speaking to DB with id {req_id} : {str(template)}')
async def _create_video_per_part(self, exercises: List[Dict], template: Dict, part: int):
template_index = part - 1
# Using list comprehension to find the element with the desired value in the 'type' field
found_exercises = [element for element in exercises if element.get('type') == part]
# Check if any elements were found
if found_exercises:
exercise = found_exercises[0]
self._logger.info(f'Creating video for speaking part {part}')
if part in {1, 2}:
result = await self._create_video(
exercise["question"],
(random.choice(list(AvatarEnum))).value,
f'Failed to create video for part {part} question: {str(exercise["question"])}'
)
if result is not None:
if part == 2:
template["exercises"][template_index]["prompts"] = exercise["prompts"]
template["exercises"][template_index]["text"] = exercise["question"]
template["exercises"][template_index]["title"] = exercise["topic"]
template["exercises"][template_index]["video_url"] = result["video_url"]
template["exercises"][template_index]["video_path"] = result["video_path"]
else:
questions = []
for question in exercise["questions"]:
result = await self._create_video(
question,
(random.choice(list(AvatarEnum))).value,
f'Failed to create video for part {part} question: {str(exercise["question"])}'
)
if result is not None:
video = {
"text": question,
"video_path": result["video_path"],
"video_url": result["video_url"]
}
questions.append(video)
template["exercises"][template_index]["prompts"] = questions
template["exercises"][template_index]["title"] = exercise["topic"]
if not found_exercises:
template["exercises"].pop(template_index)
return template
# TODO: Check if it is intended to log the original question
async def generate_speaking_video(self, original_question: str, topic: str, avatar: str, prompts: List[str]):
if len(prompts) > 0:
question = original_question + " In your answer you should consider: " + " ".join(prompts)
else:
question = original_question
error_msg = f'Failed to create video for part 1 question: {original_question}'
result = await self._create_video(
question,
avatar,
error_msg
)
if result is not None:
return {
"text": original_question,
"prompts": prompts,
"title": topic,
**result,
"type": "speaking",
"id": uuid.uuid4()
}
else:
return str(error_msg)
async def generate_interactive_video(self, questions: List[str], avatar: str, topic: str):
sp_questions = []
self._logger.info('Creating videos for speaking part 3')
for question in questions:
result = await self._create_video(
question,
avatar,
f'Failed to create video for part 3 question: {question}'
)
if result is not None:
video = {
"text": question,
**result
}
sp_questions.append(video)
return {
"prompts": sp_questions,
"title": topic,
"type": "interactiveSpeaking",
"id": uuid.uuid4()
}
async def _create_video(self, question: str, avatar: str, error_message: str):
result = await self._vid_gen.create_video(question, avatar)
if result is not None:
sound_file_path = FilePaths.VIDEO_FILES_PATH + result
firebase_file_path = FilePaths.FIREBASE_SPEAKING_VIDEO_FILES_PATH + result
url = await self._file_storage.upload_file_firebase_get_url(firebase_file_path, sound_file_path)
return {
"video_path": firebase_file_path,
"video_url": url
}
self._logger.error(error_message)
return None