From a7da187ec683b26279511fc37a1d757be8178379 Mon Sep 17 00:00:00 2001 From: Carlos-Mesquita Date: Mon, 25 Nov 2024 16:41:38 +0000 Subject: [PATCH] Writing and speaking rework, some changes to module upload --- app/api/grade.py | 49 +++-- app/api/level.py | 1 - app/api/speaking.py | 8 +- app/configs/dependency_injection.py | 11 +- app/controllers/abc/grade.py | 18 +- app/controllers/impl/grade.py | 94 ++++++++-- app/dtos/evaluation.py | 18 ++ app/dtos/speaking.py | 28 +-- app/dtos/writing.py | 2 + .../impl/document_stores/mongo.py | 4 +- app/server.py | 8 +- app/services/abc/__init__.py | 4 +- app/services/abc/evaluation.py | 33 ++++ app/services/abc/exam/speaking.py | 2 +- app/services/impl/exam/evaluation.py | 108 +++++++++++ app/services/impl/exam/level/upload.py | 28 +-- .../impl/exam/reading/import_reading.py | 2 +- app/services/impl/exam/speaking.py | 169 +++++++++--------- app/services/impl/exam/writing.py | 29 ++- app/services/impl/third_parties/whisper.py | 74 ++++++-- 20 files changed, 495 insertions(+), 195 deletions(-) create mode 100644 app/dtos/evaluation.py create mode 100644 app/services/abc/evaluation.py create mode 100644 app/services/impl/exam/evaluation.py diff --git a/app/api/grade.py b/app/api/grade.py index 84a1a0b..b3032ad 100644 --- a/app/api/grade.py +++ b/app/api/grade.py @@ -1,9 +1,8 @@ from dependency_injector.wiring import inject, Provide -from fastapi import APIRouter, Depends, Path, Request +from fastapi import APIRouter, Depends, Path, Request, BackgroundTasks from app.controllers.abc import IGradeController from app.dtos.writing import WritingGradeTaskDTO -from app.dtos.speaking import GradeSpeakingAnswersDTO, GradeSpeakingDTO from app.middlewares import Authorized, IsAuthenticatedViaBearerToken controller = "grade_controller" @@ -17,35 +16,51 @@ grade_router = APIRouter() @inject async def grade_writing_task( data: WritingGradeTaskDTO, + background_tasks: BackgroundTasks, task: int = Path(..., ge=1, le=2), grade_controller: IGradeController = Depends(Provide[controller]) ): - return await grade_controller.grade_writing_task(task, data) + return await grade_controller.grade_writing_task(task, data, background_tasks) -@grade_router.post( - '/speaking/2', - dependencies=[Depends(Authorized([IsAuthenticatedViaBearerToken]))] -) -@inject -async def grade_speaking_task_2( - data: GradeSpeakingDTO, - grade_controller: IGradeController = Depends(Provide[controller]) -): - return await grade_controller.grade_speaking_task(2, [data.dict()]) - @grade_router.post( '/speaking/{task}', dependencies=[Depends(Authorized([IsAuthenticatedViaBearerToken]))] ) @inject -async def grade_speaking_task_1_and_3( - data: GradeSpeakingAnswersDTO, +async def grade_speaking_task( + request: Request, + background_tasks: BackgroundTasks, task: int = Path(..., ge=1, le=3), grade_controller: IGradeController = Depends(Provide[controller]) ): - return await grade_controller.grade_speaking_task(task, data.answers) + form = await request.form() + return await grade_controller.grade_speaking_task(task, form, background_tasks) + + +@grade_router.get( + '/pending/{sessionId}', + dependencies=[Depends(Authorized([IsAuthenticatedViaBearerToken]))] +) +@inject +async def get_pending_evaluations( + session_id: str, + grade_controller: IGradeController = Depends(Provide[controller]) +): + return await grade_controller.get_evaluations(session_id, "pending") + + +@grade_router.get( + '/completed/{sessionId}', + dependencies=[Depends(Authorized([IsAuthenticatedViaBearerToken]))] +) +@inject +async def get_completed_evaluations( + session_id: str, + grade_controller: IGradeController = Depends(Provide[controller]) +): + return await grade_controller.get_evaluations(session_id, "completed") @grade_router.post( diff --git a/app/api/level.py b/app/api/level.py index 0f5eb41..2bf9cd6 100644 --- a/app/api/level.py +++ b/app/api/level.py @@ -18,7 +18,6 @@ async def generate_exercises( dto: LevelExercisesDTO, level_controller: ILevelController = Depends(Provide[controller]) ): - print(dto.dict()) return await level_controller.generate_exercises(dto) @level_router.get( diff --git a/app/api/speaking.py b/app/api/speaking.py index 350459a..e4245fb 100644 --- a/app/api/speaking.py +++ b/app/api/speaking.py @@ -3,9 +3,8 @@ from typing import Optional from dependency_injector.wiring import inject, Provide from fastapi import APIRouter, Path, Query, Depends -from pydantic import BaseModel -from app.dtos.video import Task, TaskStatus +from app.dtos.speaking import Video from app.middlewares import Authorized, IsAuthenticatedViaBearerToken from app.configs.constants import EducationalContent from app.controllers.abc import ISpeakingController @@ -13,11 +12,6 @@ from app.controllers.abc import ISpeakingController controller = "speaking_controller" speaking_router = APIRouter() - -class Video(BaseModel): - text: str - avatar: str - @speaking_router.post( '/media', dependencies=[Depends(Authorized([IsAuthenticatedViaBearerToken]))] diff --git a/app/configs/dependency_injection.py b/app/configs/dependency_injection.py index 10818b1..a468440 100644 --- a/app/configs/dependency_injection.py +++ b/app/configs/dependency_injection.py @@ -18,11 +18,11 @@ load_dotenv() class DependencyInjector: - def __init__(self, polly_client: any, http_client: HTTPClient, whisper_model: any): + def __init__(self, polly_client: any, http_client: HTTPClient, stt: OpenAIWhisper): self._container = containers.DynamicContainer() self._polly_client = polly_client self._http_client = http_client - self._whisper_model = whisper_model + self._stt = stt def inject(self): self._setup_clients() @@ -33,22 +33,25 @@ class DependencyInjector: self._container.wire( packages=["app"] ) + return self def _setup_clients(self): self._container.openai_client = providers.Singleton(AsyncOpenAI) self._container.polly_client = providers.Object(self._polly_client) self._container.http_client = providers.Object(self._http_client) - self._container.whisper_model = providers.Object(self._whisper_model) + self._container.stt = providers.Object(self._stt) def _setup_third_parties(self): self._container.llm = providers.Factory(OpenAI, client=self._container.openai_client) - self._container.stt = providers.Factory(OpenAIWhisper, model=self._container.whisper_model) self._container.tts = providers.Factory(AWSPolly, client=self._container.polly_client) + + """ with open('app/services/impl/third_parties/elai/conf.json', 'r') as file: elai_conf = json.load(file) with open('app/services/impl/third_parties/elai/avatars.json', 'r') as file: elai_avatars = json.load(file) + """ with open('app/services/impl/third_parties/heygen/avatars.json', 'r') as file: heygen_avatars = json.load(file) diff --git a/app/controllers/abc/grade.py b/app/controllers/abc/grade.py index a9853ce..f9c2e64 100644 --- a/app/controllers/abc/grade.py +++ b/app/controllers/abc/grade.py @@ -1,15 +1,27 @@ from abc import ABC, abstractmethod -from typing import Dict, List +from typing import Dict, List, Union +from fastapi import BackgroundTasks +from fastapi.datastructures import FormData class IGradeController(ABC): @abstractmethod - async def grade_writing_task(self, task: int, data): + async def grade_writing_task( + self, session_id: str, exercise_id: str, + task: int, dto: any, + background_tasks: BackgroundTasks + ): pass @abstractmethod - async def grade_speaking_task(self, task: int, answers: List[Dict]) -> Dict: + async def grade_speaking_task( + self, task: int, form: FormData, background_tasks: BackgroundTasks + ): + pass + + @abstractmethod + async def get_evaluations(self, session_id: str, status: str): pass @abstractmethod diff --git a/app/controllers/impl/grade.py b/app/controllers/impl/grade.py index b91c180..0998f35 100644 --- a/app/controllers/impl/grade.py +++ b/app/controllers/impl/grade.py @@ -1,34 +1,96 @@ import logging -from typing import Dict, List +from typing import Dict, List, Union +from uuid import uuid4 + +from fastapi import BackgroundTasks, Response, HTTPException +from fastapi.datastructures import FormData -from app.configs.constants import FilePaths from app.controllers.abc import IGradeController +from app.dtos.evaluation import EvaluationType +from app.dtos.speaking import GradeSpeakingItem from app.dtos.writing import WritingGradeTaskDTO -from app.helpers import FileHelper -from app.services.abc import ISpeakingService, IWritingService, IGradeService -from app.utils import handle_exception - +from app.services.abc import IGradeService, IEvaluationService class GradeController(IGradeController): def __init__( self, grade_service: IGradeService, - speaking_service: ISpeakingService, - writing_service: IWritingService + evaluation_service: IEvaluationService, ): self._service = grade_service - self._speaking_service = speaking_service - self._writing_service = writing_service + self._evaluation_service = evaluation_service self._logger = logging.getLogger(__name__) - async def grade_writing_task(self, task: int, data: WritingGradeTaskDTO): - return await self._writing_service.grade_writing_task(task, data.question, data.answer) + async def grade_writing_task( + self, session_id: str, exercise_id: str, + task: int, dto: WritingGradeTaskDTO, background_tasks: BackgroundTasks + ): + await self._evaluation_service.create_or_update_evaluation( + dto.sessionId, dto.exercise_id, EvaluationType.WRITING, task + ) - @handle_exception(400) - async def grade_speaking_task(self, task: int, answers: List[Dict]) -> Dict: - FileHelper.delete_files_older_than_one_day(FilePaths.AUDIO_FILES_PATH) - return await self._speaking_service.grade_speaking_task(task, answers) + await self._evaluation_service.begin_evaluation( + session_id, task, exercise_id, EvaluationType.WRITING, dto, background_tasks + ) + + return Response(status_code=200) + + async def grade_speaking_task(self, task: int, form: FormData, background_tasks: BackgroundTasks): + answers: Dict[int, Dict] = {} + session_id = form.get("sessionId") + exercise_id = form.get("exerciseId") + + if not session_id or not exercise_id: + raise HTTPException( + status_code=400, + detail="Fields sessionId and exerciseId are required!" + ) + + for key, value in form.items(): + if '_' not in key: + continue + + field_name, index = key.rsplit('_', 1) + index = int(index) + + if index not in answers: + answers[index] = {} + + if field_name == 'question': + answers[index]['question'] = value + elif field_name == 'audio': + answers[index]['answer'] = value + + for i, answer in answers.items(): + if 'question' not in answer or 'answer' not in answer: + raise HTTPException( + status_code=400, + detail=f"Incomplete data for answer {i}. Both question and audio required." + ) + + items = [ + GradeSpeakingItem( + question=answers[i]['question'], + answer=answers[i]['answer'] + ) + for i in sorted(answers.keys()) + ] + + ex_type = EvaluationType.SPEAKING if task == 2 else EvaluationType.SPEAKING_INTERACTIVE + + await self._evaluation_service.create_or_update_evaluation( + session_id, exercise_id, ex_type, task + ) + + await self._evaluation_service.begin_evaluation( + session_id, task, exercise_id, ex_type, items, background_tasks + ) + + return Response(status_code=200) + + async def get_evaluations(self, session_id: str, status: str): + return await self._evaluation_service.get_evaluations(session_id, status) async def grade_short_answers(self, data: Dict): return await self._service.grade_short_answers(data) diff --git a/app/dtos/evaluation.py b/app/dtos/evaluation.py new file mode 100644 index 0000000..bfdc37d --- /dev/null +++ b/app/dtos/evaluation.py @@ -0,0 +1,18 @@ +from enum import Enum +from typing import Dict, Optional +from pydantic import BaseModel + + +class EvaluationType(str, Enum): + WRITING = "writing" + SPEAKING_INTERACTIVE = "speaking_interactive" + SPEAKING = "speaking" + +class EvaluationRecord(BaseModel): + id: str + session_id: str + exercise_id: str + type: EvaluationType + task: int + status: str = "pending" + result: Optional[Dict] = None diff --git a/app/dtos/speaking.py b/app/dtos/speaking.py index 439777f..859f77e 100644 --- a/app/dtos/speaking.py +++ b/app/dtos/speaking.py @@ -1,27 +1,13 @@ -import random -from typing import List, Dict, Optional +from typing import List, Dict +from fastapi import UploadFile from pydantic import BaseModel -from app.configs.constants import MinTimers +class Video(BaseModel): + text: str + avatar: str -class SaveSpeakingDTO(BaseModel): - exercises: List[Dict] - minTimer: int = MinTimers.SPEAKING_MIN_TIMER_DEFAULT - - -class GradeSpeakingDTO(BaseModel): +class GradeSpeakingItem(BaseModel): question: str - answer: str - - -class GradeSpeakingAnswersDTO(BaseModel): - answers: List[Dict] - - -class GenerateVideo1DTO(BaseModel): - avatar: str = Optional[str] - questions: List[str] - first_topic: str - second_topic: str + answer: UploadFile diff --git a/app/dtos/writing.py b/app/dtos/writing.py index 5d9caf7..24ce013 100644 --- a/app/dtos/writing.py +++ b/app/dtos/writing.py @@ -2,5 +2,7 @@ from pydantic import BaseModel class WritingGradeTaskDTO(BaseModel): + sessionId: str question: str answer: str + exercise_id: str diff --git a/app/repositories/impl/document_stores/mongo.py b/app/repositories/impl/document_stores/mongo.py index c561ec6..cef2552 100644 --- a/app/repositories/impl/document_stores/mongo.py +++ b/app/repositories/impl/document_stores/mongo.py @@ -34,8 +34,8 @@ class MongoDB(IDocumentStore): cursor = self._mongo_db[collection].find(query) return [document async for document in cursor] - async def update(self, collection: str, filter_query: Dict, update: Dict) -> Optional[str]: - return (await self._mongo_db[collection].update_one(filter_query, update)).upserted_id + async def update(self, collection: str, filter_query: Dict, update: Dict) -> Optional[str]: + return (await self._mongo_db[collection].update_one(filter_query, update, upsert=True)).upserted_id async def get_doc_by_id(self, collection: str, doc_id: str) -> Optional[Dict]: return await self._mongo_db[collection].find_one({"id": doc_id}) diff --git a/app/server.py b/app/server.py index a064fa3..2b4ca19 100644 --- a/app/server.py +++ b/app/server.py @@ -12,7 +12,6 @@ from typing import List from http import HTTPStatus import httpx -import whisper from fastapi import FastAPI, Request from fastapi.encoders import jsonable_encoder from fastapi.exceptions import RequestValidationError @@ -27,6 +26,7 @@ from app.api import router from app.configs import DependencyInjector from app.exceptions import CustomException from app.middlewares import AuthenticationMiddleware, AuthBackend +from app.services.impl import OpenAIWhisper @asynccontextmanager @@ -36,8 +36,6 @@ async def lifespan(_app: FastAPI): https://fastapi.tiangolo.com/advanced/events/ """ - # Whisper model - whisper_model = whisper.load_model("base") # NLTK required datasets download nltk.download('words') @@ -56,11 +54,12 @@ async def lifespan(_app: FastAPI): ) http_client = httpx.AsyncClient() + stt = OpenAIWhisper() DependencyInjector( polly_client, http_client, - whisper_model + stt ).inject() # Setup logging @@ -72,6 +71,7 @@ async def lifespan(_app: FastAPI): yield + stt.close() await http_client.aclose() await polly_client.close() await context_stack.aclose() diff --git a/app/services/abc/__init__.py b/app/services/abc/__init__.py index 368c511..ab7096b 100644 --- a/app/services/abc/__init__.py +++ b/app/services/abc/__init__.py @@ -2,9 +2,11 @@ from .third_parties import * from .exam import * from .training import * from .user import IUserService +from .evaluation import IEvaluationService __all__ = [ - "IUserService" + "IUserService", + "IEvaluationService" ] __all__.extend(third_parties.__all__) __all__.extend(exam.__all__) diff --git a/app/services/abc/evaluation.py b/app/services/abc/evaluation.py new file mode 100644 index 0000000..7568777 --- /dev/null +++ b/app/services/abc/evaluation.py @@ -0,0 +1,33 @@ +from abc import abstractmethod, ABC +from typing import Union, List, Dict + +from fastapi import BackgroundTasks + +from app.dtos.evaluation import EvaluationType + +class IEvaluationService(ABC): + + @abstractmethod + async def create_evaluation( + self, + session_id: str, + exercise_id: str, + eval_type: EvaluationType, + task: int + ): + pass + + @abstractmethod + async def begin_evaluation( + self, + session_id: str, task: int, + exercise_id: str, exercise_type: str, + solution: any, + background_tasks: BackgroundTasks + ): + pass + + @abstractmethod + async def get_evaluations(self, session_id: str, status: str) -> List[Dict]: + pass + diff --git a/app/services/abc/exam/speaking.py b/app/services/abc/exam/speaking.py index 4476a54..696cec9 100644 --- a/app/services/abc/exam/speaking.py +++ b/app/services/abc/exam/speaking.py @@ -11,6 +11,6 @@ class ISpeakingService(ABC): pass @abstractmethod - async def grade_speaking_task(self, task: int, answers: List[Dict]) -> Dict: + async def grade_speaking_task(self, task: int, items: any) -> Dict: pass diff --git a/app/services/impl/exam/evaluation.py b/app/services/impl/exam/evaluation.py new file mode 100644 index 0000000..d7b8a90 --- /dev/null +++ b/app/services/impl/exam/evaluation.py @@ -0,0 +1,108 @@ +import logging +from typing import Union, Dict, List + +from fastapi import BackgroundTasks + +from app.dtos.evaluation import EvaluationType +from app.dtos.speaking import GradeSpeakingItem +from app.dtos.writing import WritingGradeTaskDTO +from app.repositories.abc import IDocumentStore +from app.services.abc import IWritingService, ISpeakingService, IEvaluationService + + +class EvaluationService(IEvaluationService): + + def __init__(self, db: IDocumentStore, writing_service: IWritingService, speaking_service: ISpeakingService): + self._db = db + self._writing_service = writing_service + self._speaking_service = speaking_service + self._logger = logging.getLogger(__name__) + + async def create_evaluation( + self, + session_id: str, + exercise_id: str, + eval_type: EvaluationType, + task: int + ): + await self._db.save_to_db( + "evaluation", + { + "session_id": session_id, + "exercise_id": exercise_id, + "type": eval_type, + "task": task, + "status": "pending" + } + ) + + async def begin_evaluation( + self, + session_id: str, task: int, + exercise_id: str, exercise_type: str, + solution: Union[WritingGradeTaskDTO, List[GradeSpeakingItem]], + background_tasks: BackgroundTasks + ): + background_tasks.add_task( + self._begin_evaluation, + session_id, task, + exercise_id, exercise_type, + solution + ) + + async def _begin_evaluation( + self, session_id: str, task: int, + exercise_id: str, exercise_type: str, + solution: Union[WritingGradeTaskDTO, List[GradeSpeakingItem]] + ): + try: + if exercise_type == EvaluationType.WRITING: + result = await self._writing_service.grade_writing_task( + task, + solution.question, + solution.answer + ) + else: + result = await self._speaking_service.grade_speaking_task( + task, + solution + ) + + await self._db.update( + "evaluation", + { + "exercise_id": exercise_id, + "session_id": session_id, + }, + { + "$set": { + "status": "completed", + "result": result, + } + } + ) + + except Exception as e: + self._logger.error(f"Error processing evaluation {session_id} - {exercise_id}: {str(e)}") + await self._db.update( + "evaluation", + { + "exercise_id": exercise_id, + "session_id": session_id + }, + { + "$set": { + "status": "error", + "error": str(e), + } + } + ) + + async def get_evaluations(self, session_id: str, status: str) -> List[Dict]: + return await self._db.find( + "evaluation", + { + "session_id": session_id, + "status": status + } + ) diff --git a/app/services/impl/exam/level/upload.py b/app/services/impl/exam/level/upload.py index 1dd5cc5..06f2363 100644 --- a/app/services/impl/exam/level/upload.py +++ b/app/services/impl/exam/level/upload.py @@ -1,8 +1,10 @@ +from uuid import uuid4 + import aiofiles import os from logging import getLogger -from typing import Dict, Any, Coroutine, Optional +from typing import Dict, Any, Optional import pdfplumber from fastapi import UploadFile @@ -21,20 +23,19 @@ class UploadLevelModule: self._logger = getLogger(__name__) self._llm = openai - async def generate_level_from_file(self, file: UploadFile, solutions: Optional[UploadFile]) -> Dict[str, Any] | None: - ext, path_id = await FileHelper.save_upload(file) - FileHelper.convert_file_to_pdf( - f'./tmp/{path_id}/upload.{ext}', f'./tmp/{path_id}/exercises.pdf' - ) - file_has_images = self._check_pdf_for_images(f'./tmp/{path_id}/exercises.pdf') + async def generate_level_from_file(self, exercises: UploadFile, solutions: Optional[UploadFile]) -> Dict[str, Any] | None: + path_id = str(uuid4()) + ext, _ = await FileHelper.save_upload(exercises, "exercises", path_id) + FileHelper.convert_file_to_html(f'./tmp/{path_id}/exercises.{ext}', f'./tmp/{path_id}/exercises.html') - if not file_has_images: - FileHelper.convert_file_to_html(f'./tmp/{path_id}/upload.{ext}', f'./tmp/{path_id}/exercises.html') + if solutions: + ext, _ = await FileHelper.save_upload(solutions, "solutions", path_id) + FileHelper.convert_file_to_html(f'./tmp/{path_id}/solutions.{ext}', f'./tmp/{path_id}/solutions.html') - completion: Coroutine[Any, Any, Exam] = ( - self._png_completion(path_id) if file_has_images else self._html_completion(path_id) - ) - response = await completion + #completion: Coroutine[Any, Any, Exam] = ( + # self._png_completion(path_id) if file_has_images else self._html_completion(path_id) + #) + response = await self._html_completion(path_id) FileHelper.remove_directory(f'./tmp/{path_id}') @@ -42,6 +43,7 @@ class UploadLevelModule: return self.fix_ids(response.model_dump(exclude_none=True)) return None + @staticmethod @suppress_loggers() def _check_pdf_for_images(pdf_path: str) -> bool: diff --git a/app/services/impl/exam/reading/import_reading.py b/app/services/impl/exam/reading/import_reading.py index c8a7cd5..32eca34 100644 --- a/app/services/impl/exam/reading/import_reading.py +++ b/app/services/impl/exam/reading/import_reading.py @@ -98,7 +98,7 @@ class ImportReadingModule: ] } ], - "text": "{{}}\\n>", + "text": "{{}}\\\\n] notice how there is a double backslash before the n -> I want an escaped newline in your output> ", "type": "writeBlanks", "prompt": "" } diff --git a/app/services/impl/exam/speaking.py b/app/services/impl/exam/speaking.py index 0a0c918..7706592 100644 --- a/app/services/impl/exam/speaking.py +++ b/app/services/impl/exam/speaking.py @@ -1,6 +1,7 @@ +import asyncio import logging import os -import random +import aiofiles import re import uuid from typing import Dict, List, Optional @@ -9,6 +10,7 @@ from app.configs.constants import ( FieldsAndExercises, GPTModels, TemperatureSettings, FilePaths ) +from app.dtos.speaking import GradeSpeakingItem from app.helpers import TextHelper from app.repositories.abc import IFileStorage, IDocumentStore from app.services.abc import ISpeakingService, ILLMService, IVideoGeneratorService, ISpeechToTextService @@ -165,105 +167,110 @@ class SpeakingService(ISpeakingService): return response - async def grade_speaking_task(self, task: int, answers: List[Dict]) -> Dict: - request_id = uuid.uuid4() - self._logger.info( - f'POST - speaking_task_{task} - Received request to grade speaking task {task}. ' - f'Use this id to track the logs: {str(request_id)} - Request data: {str(answers)}' - ) - - text_answers = [] - perfect_answers = [] + async def grade_speaking_task(self, task: int, items: List[GradeSpeakingItem]) -> Dict: + request_id = str(uuid.uuid4()) + self._log(task, request_id, f"Received request to grade speaking task {task}.") if task != 2: - self._logger.info( - f'POST - speaking_task_{task} - {str(request_id)} - Received {str(len(answers))} total answers.' - ) + self._log(task, request_id, f'Received {len(items)} total answers.') - for item in answers: - sound_file_name = FilePaths.AUDIO_FILES_PATH + str(uuid.uuid4()) + temp_files = [] + try: + # Save all files first + temp_files = await asyncio.gather(*[ + self.save_file(item) for item in items + ]) - self._logger.info(f'POST - speaking_task_{task} - {request_id} - Downloading file {item["answer"]}') + # Process all transcriptions concurrently (up to 4) + self._log(task, request_id, 'Starting batch transcription') + text_answers = await asyncio.gather(*[ + self._stt.speech_to_text(file_path) + for file_path in temp_files + ]) - await self._file_storage.download_firebase_file(item["answer"], sound_file_name) + for answer in text_answers: + self._log(task, request_id, f'Transcribed answer: {answer}') + if not TextHelper.has_x_words(answer, 20): + self._log( + task, request_id, + f'The answer had less words than threshold 20 to be graded. Answer: {answer}' + ) + return self._zero_rating("The audio recorded does not contain enough english words to be graded.") - self._logger.info( - f'POST - speaking_task_{task} - {request_id} - ' - f'Downloaded file {item["answer"]} to {sound_file_name}' - ) + # Get perfect answers + self._log(task, request_id, 'Requesting perfect answers') + perfect_answers = await asyncio.gather(*[ + self._get_perfect_answer(task, item.question) + for item in items + ]) - answer_text = await self._stt.speech_to_text(sound_file_name) - self._logger.info(f'POST - speaking_task_{task} - {request_id} - Transcripted answer: {answer_text}') + # Format the responses + if task in {1, 3}: + self._log(task, request_id, 'Formatting answers and questions for prompt.') - text_answers.append(answer_text) - item["answer"] = answer_text - os.remove(sound_file_name) + formatted_text = "" + for i, (item, transcribed_answer) in enumerate(zip(items, text_answers), start=1): + formatted_text += f"**Question {i}:**\n{item.question}\n\n" + formatted_text += f"**Answer {i}:**\n{transcribed_answer}\n\n" - # TODO: This will end the grading of all answers if a single one does not have enough words - # don't know if this is intended - if not TextHelper.has_x_words(answer_text, 20): - self._logger.info( - f'POST - speaking_task_{task} - {request_id} - ' - f'The answer had less words than threshold 20 to be graded. Answer: {answer_text}' - ) - return self._zero_rating("The audio recorded does not contain enough english words to be graded.") + self._log(task, request_id, f'Formatted answers and questions for prompt: {formatted_text}') + questions_and_answers = f'\n\n The questions and answers are: \n\n{formatted_text}' + else: + questions_and_answers = f'\n Question: "{items[0].question}" \n Answer: "{text_answers[0]}"' - self._logger.info( - f'POST - speaking_task_{task} - {request_id} - ' - f'Requesting perfect answer for question: {item["question"]}' - ) - perfect_answers.append(await self._get_perfect_answer(task, item["question"])) + self._log(task, request_id, 'Requesting grading of the answer(s).') + response = await self._grade_task(task, questions_and_answers) + self._log(task, request_id, f'Answer(s) graded: {response}') - if task in {1, 3}: - self._logger.info( - f'POST - speaking_task_{task} - {request_id} - Formatting answers and questions for prompt.' - ) + if task in {1, 3}: + self._log(task, request_id, 'Adding perfect answer(s) to response.') - formatted_text = "" - for i, entry in enumerate(answers, start=1): - formatted_text += f"**Question {i}:**\n{entry['question']}\n\n" - formatted_text += f"**Answer {i}:**\n{entry['answer']}\n\n" + # TODO: check if it is answer["answer"] instead + for i, answer in enumerate(perfect_answers, start=1): + response['perfect_answer_' + str(i)] = answer - self._logger.info( - f'POST - speaking_task_{task} - {request_id} - ' - f'Formatted answers and questions for prompt: {formatted_text}' - ) - questions_and_answers = f'\n\n The questions and answers are: \n\n{formatted_text}' - else: - questions_and_answers = f'\n Question: "{answers[0]["question"]}" \n Answer: "{answers[0]["answer"]}"' + self._log(task, request_id, 'Getting speaking corrections in parallel') + # Get all corrections in parallel + fixed_texts = await asyncio.gather(*[ + self._get_speaking_corrections(answer) + for answer in text_answers + ]) - self._logger.info(f'POST - speaking_task_{task} - {request_id} - Requesting grading of the answer(s).') - response = await self._grade_task(task, questions_and_answers) + self._log(task, request_id, 'Adding transcript and fixed texts to response.') + for i, (answer, fixed) in enumerate(zip(text_answers, fixed_texts), start=1): + response['transcript_' + str(i)] = answer + response['fixed_text_' + str(i)] = fixed + else: + response['transcript'] = text_answers[0] - self._logger.info(f'POST - speaking_task_{task} - {request_id} - Answer(s) graded: {response}') + self._log(task, request_id, 'Requesting fixed text.') + response['fixed_text'] = await self._get_speaking_corrections(text_answers[0]) + self._log(task, request_id, f'Fixed text: {response["fixed_text"]}') - if task in {1, 3}: - self._logger.info( - f'POST - speaking_task_{task} - {request_id} - Adding perfect answer(s) to response.') + response['perfect_answer'] = perfect_answers[0]["answer"] - # TODO: check if it is answer["answer"] instead - for i, answer in enumerate(perfect_answers, start=1): - response['perfect_answer_' + str(i)] = answer + response["overall"] = self._fix_speaking_overall(response["overall"], response["task_response"]) + self._log(task, request_id, f'Final response: {response}') + return response - self._logger.info( - f'POST - speaking_task_{task} - {request_id} - Adding transcript and fixed texts to response.' - ) + finally: + for file_path in temp_files: + try: + if os.path.exists(file_path): + os.remove(file_path) + except Exception as e: + self._log(task, request_id, f'Error cleaning up temp file {file_path}: {str(e)}') - for i, answer in enumerate(text_answers, start=1): - response['transcript_' + str(i)] = answer - response['fixed_text_' + str(i)] = await self._get_speaking_corrections(answer) - else: - response['transcript'] = answers[0]["answer"] + def _log(self, task: int, request_id: str, message: str): + self._logger.info(f'POST - speaking_task_{task} - {request_id} - {message}') - self._logger.info(f'POST - speaking_task_{task} - {request_id} - Requesting fixed text.') - response['fixed_text'] = await self._get_speaking_corrections(answers[0]["answer"]) - self._logger.info(f'POST - speaking_task_{task} - {request_id} - Fixed text: {response["fixed_text"]}') - - response['perfect_answer'] = perfect_answers[0]["answer"] - - response["overall"] = self._fix_speaking_overall(response["overall"], response["task_response"]) - self._logger.info(f'POST - speaking_task_{task} - {request_id} - Final response: {response}') - return response + @staticmethod + async def save_file(item: GradeSpeakingItem) -> str: + sound_file_name = FilePaths.AUDIO_FILES_PATH + str(uuid.uuid4()) + content = await item.answer.read() + async with aiofiles.open(sound_file_name, 'wb') as f: + await f.write(content) + return sound_file_name # ================================================================================================================== # grade_speaking_task helpers @@ -336,7 +343,7 @@ class SpeakingService(ISpeakingService): { "role": "user", "content": ( - 'For pronunciations act as if you heard the answers and they were transcripted ' + 'For pronunciations act as if you heard the answers and they were transcribed ' 'as you heard them.' ) }, diff --git a/app/services/impl/exam/writing.py b/app/services/impl/exam/writing.py index 9664d2a..d234869 100644 --- a/app/services/impl/exam/writing.py +++ b/app/services/impl/exam/writing.py @@ -1,3 +1,4 @@ +import asyncio from typing import List, Dict from app.services.abc import IWritingService, ILLMService, IAIDetectorService @@ -126,7 +127,7 @@ class WritingService(IWritingService): TemperatureSettings.GEN_QUESTION_TEMPERATURE ) - response = await self._llm.prediction( + evaluation_promise = self._llm.prediction( llm_model, messages, ["comment"], @@ -134,15 +135,27 @@ class WritingService(IWritingService): ) perfect_answer_minimum = 150 if task == 1 else 250 - perfect_answer = await self._get_perfect_answer(question, perfect_answer_minimum) + perfect_answer_promise = self._get_perfect_answer(question, perfect_answer_minimum) + fixed_text_promise = self._get_fixed_text(answer) + ai_detection_promise = self._ai_detector.run_detection(answer) - response["perfect_answer"] = perfect_answer["perfect_answer"] - response["overall"] = ExercisesHelper.fix_writing_overall(response["overall"], response["task_response"]) - response['fixed_text'] = await self._get_fixed_text(answer) + prediction_result, perfect_answer_result, fixed_text_result, ai_detection_result = await asyncio.gather( + evaluation_promise, + perfect_answer_promise, + fixed_text_promise, + ai_detection_promise + ) - ai_detection = await self._ai_detector.run_detection(answer) - if ai_detection is not None: - response['ai_detection'] = ai_detection + response = prediction_result + response["perfect_answer"] = perfect_answer_result["perfect_answer"] + response["overall"] = ExercisesHelper.fix_writing_overall( + response["overall"], + response["task_response"] + ) + response['fixed_text'] = fixed_text_result + + if ai_detection_result is not None: + response['ai_detection'] = ai_detection_result return response diff --git a/app/services/impl/third_parties/whisper.py b/app/services/impl/third_parties/whisper.py index ca87070..d74dedc 100644 --- a/app/services/impl/third_parties/whisper.py +++ b/app/services/impl/third_parties/whisper.py @@ -1,22 +1,66 @@ import os - -from fastapi.concurrency import run_in_threadpool - +import threading +import whisper +import asyncio +from concurrent.futures import ThreadPoolExecutor +from typing import Dict from whisper import Whisper + from app.services.abc import ISpeechToTextService - +""" + The whisper model is not thread safe, a thread pool + with 4 whisper models will be created so it can + process up to 4 transcriptions at a time. + + The base model requires ~1GB so 4 instances is the safe bet: + https://github.com/openai/whisper?tab=readme-ov-file#available-models-and-languages +""" class OpenAIWhisper(ISpeechToTextService): + def __init__(self, model_name: str = "base", num_models: int = 4): + self._model_name = model_name + self._num_models = num_models + self._models: Dict[int, 'Whisper'] = {} + self._lock = threading.Lock() + self._next_model_id = 0 + self._is_closed = False - def __init__(self, model: Whisper): - self._model = model + for i in range(num_models): + self._models[i] = whisper.load_model(self._model_name) - async def speech_to_text(self, file_path): - if os.path.exists(file_path): - result = await run_in_threadpool( - self._model.transcribe, file_path, fp16=False, language='English', verbose=False - ) - return result["text"] - else: - print("File not found:", file_path) - raise Exception("File " + file_path + " not found.") + self._executor = ThreadPoolExecutor( + max_workers=num_models, + thread_name_prefix="whisper_worker" + ) + + def get_model(self) -> 'Whisper': + with self._lock: + model_id = self._next_model_id + self._next_model_id = (self._next_model_id + 1) % self._num_models + return self._models[model_id] + + async def speech_to_text(self, file_path: str) -> str: + if not os.path.exists(file_path): + raise FileNotFoundError(f"File {file_path} not found.") + + def transcribe(): + model = self.get_model() + return model.transcribe( + file_path, + fp16=False, + language='English', + verbose=False + )["text"] + + loop = asyncio.get_running_loop() + return await loop.run_in_executor(self._executor, transcribe) + + def close(self): + with self._lock: + if not self._is_closed: + self._is_closed = True + if self._executor: + self._executor.shutdown(wait=True, cancel_futures=True) + + def __del__(self): + self.close()