Merged in release/async (pull request #36)

Release/async

Approved-by: Tiago Ribeiro
This commit is contained in:
carlos.mesquita
2024-11-10 10:47:48 +00:00
committed by Tiago Ribeiro
43 changed files with 1003 additions and 867 deletions

View File

@@ -18,6 +18,7 @@ async def generate_exercises(
dto: LevelExercisesDTO,
level_controller: ILevelController = Depends(Provide[controller])
):
print(dto.dict())
return await level_controller.generate_exercises(dto)
@level_router.get(
@@ -47,11 +48,12 @@ async def get_level_utas(
dependencies=[Depends(Authorized([IsAuthenticatedViaBearerToken]))]
)
@inject
async def upload(
file: UploadFile,
async def import_level(
exercises: UploadFile,
solutions: UploadFile = None,
level_controller: ILevelController = Depends(Provide[controller])
):
return await level_controller.upload_level(file)
return await level_controller.upload_level(exercises, solutions)
@level_router.post(

View File

@@ -1,7 +1,7 @@
import random
from dependency_injector.wiring import Provide, inject
from fastapi import APIRouter, Depends, Path, Query
from fastapi import APIRouter, Depends, Path, Query, UploadFile
from app.middlewares import Authorized, IsAuthenticatedViaBearerToken
from app.controllers.abc import IListeningController
@@ -11,6 +11,19 @@ from app.dtos.listening import SaveListeningDTO, GenerateListeningExercises, Dia
controller = "listening_controller"
listening_router = APIRouter()
@listening_router.post(
'/import',
dependencies=[Depends(Authorized([IsAuthenticatedViaBearerToken]))]
)
@inject
async def upload(
exercises: UploadFile,
solutions: UploadFile = None,
listening_controller: IListeningController = Depends(Provide[controller])
):
return await listening_controller.import_exam(exercises, solutions)
@listening_router.get(
'/{section}',
dependencies=[Depends(Authorized([IsAuthenticatedViaBearerToken]))]
@@ -49,15 +62,3 @@ async def generate_listening_exercise(
listening_controller: IListeningController = Depends(Provide[controller])
):
return await listening_controller.get_listening_question(section, dto)
@listening_router.post(
'/',
dependencies=[Depends(Authorized([IsAuthenticatedViaBearerToken]))]
)
@inject
async def save_listening(
data: SaveListeningDTO,
listening_controller: IListeningController = Depends(Provide[controller])
):
return await listening_controller.save_listening(data)

View File

@@ -1,8 +1,10 @@
import random
from typing import Optional
from dependency_injector.wiring import Provide, inject
from fastapi import APIRouter, Depends, Path, Query, UploadFile
from app.configs.constants import EducationalContent
from app.dtos.reading import ReadingDTO
from app.middlewares import Authorized, IsAuthenticatedViaBearerToken
from app.controllers.abc import IReadingController
@@ -21,8 +23,6 @@ async def upload(
solutions: UploadFile = None,
reading_controller: IReadingController = Depends(Provide[controller])
):
print(exercises.filename)
#print(solutions.filename)
return await reading_controller.import_exam(exercises, solutions)
@reading_router.get(
@@ -36,6 +36,7 @@ async def generate_passage(
passage: int = Path(..., ge=1, le=3),
reading_controller: IReadingController = Depends(Provide[controller])
):
topic = random.choice(EducationalContent.TOPICS) if not topic else topic
return await reading_controller.generate_reading_passage(passage, topic, word_count)
@reading_router.post(

View File

@@ -2,31 +2,57 @@ import random
from typing import Optional
from dependency_injector.wiring import inject, Provide
from fastapi import APIRouter, Path, Query, Depends, BackgroundTasks
from fastapi import APIRouter, Path, Query, Depends
from pydantic import BaseModel
from app.dtos.video import Task, TaskStatus
from app.middlewares import Authorized, IsAuthenticatedViaBearerToken
from app.configs.constants import EducationalContent
from app.controllers.abc import ISpeakingController
from app.dtos.speaking import (
SaveSpeakingDTO, GenerateVideo1DTO, GenerateVideo2DTO, GenerateVideo3DTO
)
controller = "speaking_controller"
speaking_router = APIRouter()
@speaking_router.get(
'/1',
class Video(BaseModel):
text: str
avatar: str
@speaking_router.post(
'/media',
dependencies=[Depends(Authorized([IsAuthenticatedViaBearerToken]))]
)
@inject
async def get_speaking_task(
first_topic: str = Query(default=random.choice(EducationalContent.MTI_TOPICS)),
second_topic: str = Query(default=random.choice(EducationalContent.MTI_TOPICS)),
difficulty: str = Query(default=random.choice(EducationalContent.DIFFICULTIES)),
async def generate_video(
video: Video,
speaking_controller: ISpeakingController = Depends(Provide[controller])
):
return await speaking_controller.get_speaking_part(1, first_topic, difficulty, second_topic)
return await speaking_controller.generate_video(video.text, video.avatar)
@speaking_router.get(
'/media/{vid_id}',
dependencies=[Depends(Authorized([IsAuthenticatedViaBearerToken]))]
)
@inject
async def poll_video(
vid_id: str = Path(...),
speaking_controller: ISpeakingController = Depends(Provide[controller])
):
return await speaking_controller.poll_video(vid_id)
@speaking_router.get(
'/avatars',
dependencies=[Depends(Authorized([IsAuthenticatedViaBearerToken]))]
)
@inject
async def get_avatars(
speaking_controller: ISpeakingController = Depends(Provide[controller])
):
return await speaking_controller.get_avatars()
@speaking_router.get(
@@ -35,64 +61,17 @@ async def get_speaking_task(
)
@inject
async def get_speaking_task(
task: int = Path(..., ge=2, le=3),
topic: str = Query(default=random.choice(EducationalContent.MTI_TOPICS)),
task: int = Path(..., ge=1, le=3),
topic: Optional[str] = Query(None),
first_topic: Optional[str] = Query(None),
second_topic: Optional[str] = Query(None),
difficulty: str = Query(default=random.choice(EducationalContent.DIFFICULTIES)),
speaking_controller: ISpeakingController = Depends(Provide[controller])
):
return await speaking_controller.get_speaking_part(task, topic, difficulty)
if not second_topic:
topic_or_first_topic = topic if topic else random.choice(EducationalContent.MTI_TOPICS)
else:
topic_or_first_topic = first_topic if first_topic else random.choice(EducationalContent.MTI_TOPICS)
@speaking_router.post(
'/',
dependencies=[Depends(Authorized([IsAuthenticatedViaBearerToken]))]
)
@inject
async def save_speaking(
data: SaveSpeakingDTO,
background_tasks: BackgroundTasks,
speaking_controller: ISpeakingController = Depends(Provide[controller])
):
return await speaking_controller.save_speaking(data, background_tasks)
@speaking_router.post(
'/generate_video/1',
dependencies=[Depends(Authorized([IsAuthenticatedViaBearerToken]))]
)
@inject
async def generate_video_1(
data: GenerateVideo1DTO,
speaking_controller: ISpeakingController = Depends(Provide[controller])
):
return await speaking_controller.generate_video(
1, data.avatar, data.first_topic, data.questions, second_topic=data.second_topic
)
@speaking_router.post(
'/generate_video/2',
dependencies=[Depends(Authorized([IsAuthenticatedViaBearerToken]))]
)
@inject
async def generate_video_2(
data: GenerateVideo2DTO,
speaking_controller: ISpeakingController = Depends(Provide[controller])
):
return await speaking_controller.generate_video(
2, data.avatar, data.topic, [data.question], prompts=data.prompts, suffix=data.suffix
)
@speaking_router.post(
'/generate_video/3',
dependencies=[Depends(Authorized([IsAuthenticatedViaBearerToken]))]
)
@inject
async def generate_video_3(
data: GenerateVideo3DTO,
speaking_controller: ISpeakingController = Depends(Provide[controller])
):
return await speaking_controller.generate_video(
3, data.avatar, data.topic, data.questions
)
second_topic = second_topic if second_topic else random.choice(EducationalContent.MTI_TOPICS)
return await speaking_controller.get_speaking_part(task, topic_or_first_topic, second_topic, difficulty)

View File

@@ -99,87 +99,6 @@ class QuestionType(Enum):
READING_PASSAGE_3 = "Reading Passage 3"
class HeygenAvatars(Enum):
MATTHEW_NOAH = "5912afa7c77c47d3883af3d874047aaf"
VERA_CERISE = "9e58d96a383e4568a7f1e49df549e0e4"
EDWARD_TONY = "d2cdd9c0379a4d06ae2afb6e5039bd0c"
TANYA_MOLLY = "045cb5dcd00042b3a1e4f3bc1c12176b"
KAYLA_ABBI = "1ae1e5396cc444bfad332155fdb7a934"
JEROME_RYAN = "0ee6aa7cc1084063a630ae514fccaa31"
TYLER_CHRISTOPHER = "5772cff935844516ad7eeff21f839e43"
from enum import Enum
class ELAIAvatars(Enum):
# Works
GIA_BUSINESS = {
"avatar_code": "gia.business",
"avatar_gender": "female",
"avatar_url": "https://elai-avatars.s3.us-east-2.amazonaws.com/common/gia/business/gia_business.png",
"avatar_canvas": "https://elai-avatars.s3.us-east-2.amazonaws.com/common/gia/business/gia_business.png",
"voice_id": "EXAVITQu4vr4xnSDxMaL",
"voice_provider": "elevenlabs"
}
# Works
VADIM_BUSINESS = {
"avatar_code": "vadim.business",
"avatar_gender": "male",
"avatar_url": "https://elai-avatars.s3.us-east-2.amazonaws.com/common/vadim/business/vadim_business.png",
"avatar_canvas": "https://d3u63mhbhkevz8.cloudfront.net/common/vadim/business/vadim_business.png",
"voice_id": "flq6f7yk4E4fJM5XTYuZ",
"voice_provider": "elevenlabs"
}
ORHAN_BUSINESS = {
"avatar_code": "orhan.business",
"avatar_gender": "male",
"avatar_url": "https://elai-avatars.s3.us-east-2.amazonaws.com/common/orhan/business/orhan.png",
"avatar_canvas": "https://d3u63mhbhkevz8.cloudfront.net/common/orhan/business/orhan.png",
"voice_id": "en-US-AndrewMultilingualNeural",
"voice_provider": "azure"
}
FLORA_BUSINESS = {
"avatar_code": "flora.business",
"avatar_gender": "female",
"avatar_url": "https://elai-avatars.s3.us-east-2.amazonaws.com/common/flora/business/flora_business.png",
"avatar_canvas": "https://d3u63mhbhkevz8.cloudfront.net/common/flora/business/flora_business.png",
"voice_id": "en-US-JaneNeural",
"voice_provider": "azure"
}
SCARLETT_BUSINESS = {
"avatar_code": "scarlett.business",
"avatar_gender": "female",
"avatar_url": "https://elai-avatars.s3.us-east-2.amazonaws.com/common/scarlett/business/scarlett_business.png",
"avatar_canvas": "https://d3u63mhbhkevz8.cloudfront.net/common/scarlett/business/scarlett_business.png",
"voice_id": "en-US-NancyNeural",
"voice_provider": "azure"
}
PARKER_CASUAL = {
"avatar_code": "parker.casual",
"avatar_gender": "male",
"avatar_url": "https://elai-avatars.s3.us-east-2.amazonaws.com/common/parker/casual/parker_casual.png",
"avatar_canvas": "https://d3u63mhbhkevz8.cloudfront.net/common/parker/casual/parker_casual.png",
"voice_id": "en-US-TonyNeural",
"voice_provider": "azure"
}
ETHAN_BUSINESS = {
"avatar_code": "ethan.business",
"avatar_gender": "male",
"avatar_url": "https://elai-avatars.s3.us-east-2.amazonaws.com/common/ethan/business/ethan_business_low.png",
"avatar_canvas": "https://d3u63mhbhkevz8.cloudfront.net/common/ethan/business/ethan_business_low.png",
"voice_id": "en-US-JasonNeural",
"voice_provider": "azure"
}
class FilePaths:
AUDIO_FILES_PATH = 'download-audio/'
FIREBASE_LISTENING_AUDIO_FILES_PATH = 'listening_recordings/'

View File

@@ -44,11 +44,17 @@ class DependencyInjector:
self._container.llm = providers.Factory(OpenAI, client=self._container.openai_client)
self._container.stt = providers.Factory(OpenAIWhisper, model=self._container.whisper_model)
self._container.tts = providers.Factory(AWSPolly, client=self._container.polly_client)
with open('app/services/impl/third_parties/elai/elai_conf.json', 'r') as file:
with open('app/services/impl/third_parties/elai/conf.json', 'r') as file:
elai_conf = json.load(file)
with open('app/services/impl/third_parties/elai/avatars.json', 'r') as file:
elai_avatars = json.load(file)
with open('app/services/impl/third_parties/heygen/avatars.json', 'r') as file:
heygen_avatars = json.load(file)
self._container.vid_gen = providers.Factory(
Heygen, client=self._container.http_client, token=os.getenv("HEY_GEN_TOKEN")
Heygen, client=self._container.http_client, token=os.getenv("HEY_GEN_TOKEN"), avatars=heygen_avatars
)
self._container.ai_detector = providers.Factory(
GPTZero, client=self._container.http_client, gpt_zero_key=os.getenv("GPT_ZERO_API_KEY")
@@ -79,8 +85,8 @@ class DependencyInjector:
self._container.reading_service = providers.Factory(ReadingService, llm=self._container.llm)
self._container.speaking_service = providers.Factory(
SpeakingService, llm=self._container.llm, vid_gen=self._container.vid_gen,
file_storage=self._container.firebase_instance, document_store=self._container.document_store,
SpeakingService, llm=self._container.llm,
file_storage=self._container.firebase_instance,
stt=self._container.stt
)
@@ -144,7 +150,7 @@ class DependencyInjector:
)
self._container.speaking_controller = providers.Factory(
SpeakingController, speaking_service=self._container.speaking_service
SpeakingController, speaking_service=self._container.speaking_service, vid_gen=self._container.vid_gen
)
self._container.writing_controller = providers.Factory(

View File

@@ -1,7 +1,7 @@
from abc import ABC, abstractmethod
from fastapi import UploadFile
from typing import Dict
from typing import Dict, Optional
class ILevelController(ABC):
@@ -19,7 +19,7 @@ class ILevelController(ABC):
pass
@abstractmethod
async def upload_level(self, file: UploadFile):
async def upload_level(self, file: UploadFile, solutions: Optional[UploadFile] = None):
pass
@abstractmethod

View File

@@ -1,9 +1,14 @@
from abc import ABC, abstractmethod
from typing import List
from fastapi import UploadFile
class IListeningController(ABC):
@abstractmethod
async def import_exam(self, exercises: UploadFile, solutions: UploadFile = None):
pass
@abstractmethod
async def generate_listening_dialog(self, section_id: int, topic: str, difficulty: str):
pass
@@ -15,7 +20,3 @@ class IListeningController(ABC):
@abstractmethod
async def generate_mp3(self, dto):
pass
@abstractmethod
async def save_listening(self, data):
pass

View File

@@ -7,19 +7,17 @@ from fastapi import BackgroundTasks
class ISpeakingController(ABC):
@abstractmethod
async def get_speaking_part(self, task: int, topic: str, difficulty: str, second_topic: Optional[str] = None):
async def get_speaking_part(self, task: int, topic: str, second_topic: str, difficulty: str):
pass
@abstractmethod
async def save_speaking(self, data, background_tasks: BackgroundTasks):
async def get_avatars(self):
pass
@abstractmethod
async def generate_video(
self, part: int, avatar: str, topic: str, questions: list[str],
*,
second_topic: Optional[str] = None,
prompts: Optional[list[str]] = None,
suffix: Optional[str] = None,
):
async def generate_video(self, text: str, avatar: str):
pass
@abstractmethod
async def poll_video(self, vid_id: str):
pass

View File

@@ -1,5 +1,5 @@
from fastapi import UploadFile
from typing import Dict
from typing import Dict, Optional
from watchfiles import awatch
@@ -21,8 +21,8 @@ class LevelController(ILevelController):
async def get_level_utas(self):
return await self._service.get_level_utas()
async def upload_level(self, file: UploadFile):
return await self._service.upload_level(file)
async def upload_level(self, exercises: UploadFile, solutions: Optional[UploadFile] = None):
return await self._service.upload_level(exercises, solutions)
async def get_custom_level(self, data: Dict):
return await self._service.get_custom_level(data)

View File

@@ -1,7 +1,11 @@
import io
from fastapi import UploadFile
from starlette.responses import StreamingResponse, Response
from app.controllers.abc import IListeningController
from app.dtos.listening import SaveListeningDTO, GenerateListeningExercises, Dialog
from app.dtos.listening import GenerateListeningExercises, Dialog
from app.services.abc import IListeningService
from fastapi import Response
class ListeningController(IListeningController):
@@ -9,6 +13,13 @@ class ListeningController(IListeningController):
def __init__(self, listening_service: IListeningService):
self._service = listening_service
async def import_exam(self, exercises: UploadFile, solutions: UploadFile = None):
res = await self._service.import_exam(exercises, solutions)
if not res:
return Response(status_code=500)
else:
return res
async def generate_listening_dialog(self, section_id: int, topic: str, difficulty: str):
return await self._service.generate_listening_dialog(section_id, topic, difficulty)
@@ -17,13 +28,12 @@ class ListeningController(IListeningController):
async def generate_mp3(self, dto: Dialog):
mp3 = await self._service.generate_mp3(dto)
return Response(
content=mp3,
return StreamingResponse(
content=io.BytesIO(mp3),
media_type="audio/mpeg",
headers={
"Content-Type": "audio/mpeg",
"Content-Disposition": "attachment;filename=speech.mp3"
}
)
async def save_listening(self, data: SaveListeningDTO):
return await self._service.save_listening(data.parts, data.minTimer, data.difficulty, data.id)

View File

@@ -1,8 +1,7 @@
import logging
from typing import Optional
from fastapi import UploadFile
from grpc import services
from fastapi import UploadFile, Response
from app.controllers.abc import IReadingController
from app.dtos.reading import ReadingDTO
@@ -16,7 +15,11 @@ class ReadingController(IReadingController):
self._logger = logging.getLogger(__name__)
async def import_exam(self, exercises: UploadFile, solutions: UploadFile = None):
return await self._service.import_exam(exercises, solutions)
res = await self._service.import_exam(exercises, solutions)
if not res:
return Response(status_code=500)
else:
return res
async def generate_reading_passage(self, passage: int, topic: Optional[str], word_count: Optional[int]):
return await self._service.generate_reading_passage(passage, topic, word_count)

View File

@@ -1,47 +1,26 @@
import logging
import uuid
import random
from typing import Optional
from fastapi import BackgroundTasks
from app.controllers.abc import ISpeakingController
from app.dtos.speaking import SaveSpeakingDTO
from app.services.abc import ISpeakingService
from app.configs.constants import ExamVariant, MinTimers
from app.configs.question_templates import getSpeakingTemplate
from app.services.abc import ISpeakingService, IVideoGeneratorService
class SpeakingController(ISpeakingController):
def __init__(self, speaking_service: ISpeakingService):
def __init__(self, speaking_service: ISpeakingService, vid_gen: IVideoGeneratorService):
self._service = speaking_service
self._vid_gen = vid_gen
self._logger = logging.getLogger(__name__)
async def get_speaking_part(self, task: int, topic: str, difficulty: str, second_topic: Optional[str] = None):
return await self._service.get_speaking_part(task, topic, difficulty, second_topic)
async def get_speaking_part(self, task: int, topic: str, second_topic: str, difficulty: str):
return await self._service.get_speaking_part(task, topic, second_topic, difficulty)
async def save_speaking(self, data: SaveSpeakingDTO, background_tasks: BackgroundTasks):
exercises = data.exercises
min_timer = data.minTimer
async def get_avatars(self):
return await self._vid_gen.get_avatars()
template = getSpeakingTemplate()
template["minTimer"] = min_timer
async def generate_video(self, text: str, avatar: str):
return await self._vid_gen.create_video(text, avatar)
if min_timer < MinTimers.SPEAKING_MIN_TIMER_DEFAULT:
template["variant"] = ExamVariant.PARTIAL.value
else:
template["variant"] = ExamVariant.FULL.value
req_id = str(uuid.uuid4())
self._logger.info(f'Received request to save speaking with id: {req_id}')
background_tasks.add_task(self._service.create_videos_and_save_to_db, exercises, template, req_id)
self._logger.info('Started background task to save speaking.')
# Return response without waiting for create_videos_and_save_to_db to finish
return {**template, "id": req_id}
async def generate_video(self, *args, **kwargs):
return await self._service.generate_video(*args, **kwargs)
async def poll_video(self, vid_id: str):
return await self._vid_gen.poll_status(vid_id)

View File

@@ -0,0 +1,80 @@
from enum import Enum
from pydantic import BaseModel, Field
from typing import List, Union, Optional, Literal
from uuid import uuid4, UUID
class ExerciseBase(BaseModel):
id: UUID = Field(default_factory=uuid4)
type: str
prompt: str
class TrueFalseSolution(str, Enum):
TRUE = "true"
FALSE = "false"
NOT_GIVEN = "not_given"
class TrueFalseQuestions(BaseModel):
prompt: str
solution: TrueFalseSolution
id: str
class TrueFalseExercise(ExerciseBase):
type: Literal["trueFalse"]
questions: List[TrueFalseQuestions]
class MCOption(BaseModel):
id: str
text: str
class MCQuestion(BaseModel):
id: str
prompt: str
options: List[MCOption]
solution: str
variant: str = "text"
class MultipleChoiceExercise(ExerciseBase):
type: Literal["multipleChoice"]
questions: List[MCQuestion]
class WriteBlanksVariant(str, Enum):
QUESTIONS = "questions"
FILL = "fill"
FORM = "form"
class WriteBlankSolution(BaseModel):
id: str
solution: List[str]
class WriteBlanksExercise(ExerciseBase):
type: Literal["writeBlanks"]
maxWords: int
solutions: List[WriteBlankSolution]
text: str
variant: Optional[WriteBlanksVariant]
ListeningExercise = Union[
TrueFalseExercise,
MultipleChoiceExercise,
WriteBlanksExercise
]
class ListeningSection(BaseModel):
exercises: List[ListeningExercise]
class ListeningExam(BaseModel):
module: str = "listening"
minTimer: Optional[int]
sections: List[ListeningSection]

View File

@@ -1,7 +1,7 @@
from enum import Enum
from pydantic import BaseModel, Field
from typing import List, Union
from typing import List, Union, Optional
from uuid import uuid4, UUID
@@ -15,10 +15,7 @@ class WriteBlanksExercise(BaseModel):
maxWords: int
solutions: List[WriteBlanksSolution]
text: str
@property
def prompt(self) -> str:
return f"Choose no more than {self.maxWords} words and/or a number from the passage for each answer."
prompt: str
class MatchSentencesOption(BaseModel):
@@ -32,20 +29,29 @@ class MatchSentencesVariant(str, Enum):
HEADING = "heading"
IDEAMATCH = "ideaMatch"
class MCOption(BaseModel):
id: str
text: str
class MCQuestion(BaseModel):
id: str
prompt: str
options: List[MCOption]
solution: str
variant: Optional[str] = None
class MultipleChoice(BaseModel):
questions: List[MCQuestion]
type: str
prompt: str
class MatchSentencesExercise(BaseModel):
options: List[MatchSentencesOption]
sentences: List[MatchSentencesSentence]
type: str = "matchSentences"
variant: MatchSentencesVariant
@property
def prompt(self) -> str:
return (
"Choose the correct heading for paragraphs from the list of headings below."
if self.variant == MatchSentencesVariant.HEADING else
"Choose the correct author for the ideas/opinions from the list of authors below."
)
prompt: str
class TrueFalseSolution(str, Enum):
TRUE = "true"
@@ -80,18 +86,9 @@ class FillBlanksExercise(BaseModel):
type: str = "fillBlanks"
words: List[FillBlanksWord]
allowRepetition: bool = False
prompt: str
@property
def prompt(self) -> str:
prompt = "Complete the summary below. Write the letter of the corresponding word(s) for it."
return (
f"{prompt}"
if len(self.solutions) == len(self.words) else
f"{prompt}\\nThere are more words than spaces so you will not use them all."
)
Exercise = Union[FillBlanksExercise, TrueFalseExercise, MatchSentencesExercise, WriteBlanksExercise]
Exercise = Union[FillBlanksExercise, TrueFalseExercise, MatchSentencesExercise, WriteBlanksExercise, MultipleChoice]
class Context(BaseModel):

View File

@@ -8,12 +8,11 @@ from app.configs.constants import LevelExerciseType
class LevelExercises(BaseModel):
type: LevelExerciseType
quantity: int
text_size: Optional[int]
sa_qty: Optional[int]
mc_qty: Optional[int]
topic: Optional[str]
text_size: Optional[int] = None
sa_qty: Optional[int] = None
mc_qty: Optional[int] = None
topic: Optional[str] = None
class LevelExercisesDTO(BaseModel):
text: str
exercises: List[LevelExercises]
difficulty: Optional[str]
difficulty: Optional[str] = None

View File

@@ -1,9 +1,9 @@
import random
from typing import List, Dict
from typing import List, Dict, Optional
from pydantic import BaseModel
from app.configs.constants import MinTimers, ELAIAvatars
from app.configs.constants import MinTimers
class SaveSpeakingDTO(BaseModel):
@@ -21,22 +21,7 @@ class GradeSpeakingAnswersDTO(BaseModel):
class GenerateVideo1DTO(BaseModel):
avatar: str = (random.choice(list(ELAIAvatars))).name
avatar: str = Optional[str]
questions: List[str]
first_topic: str
second_topic: str
class GenerateVideo2DTO(BaseModel):
avatar: str = (random.choice(list(ELAIAvatars))).name
prompts: List[str] = []
suffix: str = ""
question: str
topic: str
class GenerateVideo3DTO(BaseModel):
avatar: str = (random.choice(list(ELAIAvatars))).name
questions: List[str]
topic: str

15
app/dtos/video.py Normal file
View File

@@ -0,0 +1,15 @@
import random
from enum import Enum
from typing import Optional
from pydantic import BaseModel
class TaskStatus(Enum):
STARTED = "STARTED"
IN_PROGRESS = "IN_PROGRESS"
COMPLETED = "COMPLETED"
ERROR = "ERROR"
class Task(BaseModel):
status: TaskStatus
result: Optional[str] = None

32
app/mappers/listening.py Normal file
View File

@@ -0,0 +1,32 @@
from typing import Dict, Any
from app.dtos.exams.listening import TrueFalseExercise, MultipleChoiceExercise, WriteBlanksExercise, ListeningExam, \
ListeningSection
class ListeningMapper:
@staticmethod
def map_to_test_model(response: Dict[str, Any]) -> ListeningExam:
sections = []
for section in response.get('sections', []):
section_exercises = []
for exercise in section['exercises']:
exercise_type = exercise['type']
if exercise_type == 'trueFalse':
section_exercises.append(TrueFalseExercise(**exercise))
elif exercise_type == 'multipleChoice':
section_exercises.append(MultipleChoiceExercise(**exercise))
elif exercise_type == 'writeBlanks':
section_exercises.append(WriteBlanksExercise(**exercise))
else:
raise ValueError(f"Unknown exercise type: {exercise_type}")
sections.append(ListeningSection(exercises=section_exercises))
return ListeningExam(
sections=sections,
minTimer=response.get('minTimer'),
module="listening"
)

View File

@@ -3,7 +3,7 @@ from typing import Dict, Any
from app.dtos.exams.reading import (
Part, Exam, Context, FillBlanksExercise,
TrueFalseExercise, MatchSentencesExercise,
WriteBlanksExercise
WriteBlanksExercise, MultipleChoice
)
@@ -20,13 +20,18 @@ class ReadingMapper:
'fillBlanks': FillBlanksExercise,
'trueFalse': TrueFalseExercise,
'matchSentences': MatchSentencesExercise,
'writeBlanks': WriteBlanksExercise
'writeBlanks': WriteBlanksExercise,
'multipleChoice': MultipleChoice,
}
exercises = []
for exercise in part_exercises:
exercise_type = exercise['type']
exercises.append(model_map[exercise_type](**exercise))
if exercise_type in model_map:
model_class = model_map[exercise_type]
exercises.append(model_class(**exercise))
else:
raise ValueError(f"Unknown exercise type: {exercise_type}")
part_kwargs = {
"exercises": exercises,

View File

@@ -29,7 +29,7 @@ class ILevelService(ABC):
pass
@abstractmethod
async def upload_level(self, upload: UploadFile) -> Dict:
async def upload_level(self, upload: UploadFile, solutions: Optional[UploadFile] = None) -> Dict:
pass
@abstractmethod

View File

@@ -1,7 +1,7 @@
import queue
from abc import ABC, abstractmethod
from queue import Queue
from typing import Dict, List
from typing import Dict, List, Any
from fastapi import UploadFile
@@ -25,5 +25,7 @@ class IListeningService(ABC):
pass
@abstractmethod
async def save_listening(self, parts: list[dict], min_timer: int, difficulty: str, listening_id: str) -> Dict:
async def import_exam(
self, exercises: UploadFile, solutions: UploadFile = None
) -> Dict[str, Any] | None:
pass

View File

@@ -6,7 +6,7 @@ class ISpeakingService(ABC):
@abstractmethod
async def get_speaking_part(
self, part: int, topic: str, difficulty: str, second_topic: Optional[str] = None
self, part: int, topic: str, second_topic: str, difficulty: str
) -> Dict:
pass
@@ -14,16 +14,3 @@ class ISpeakingService(ABC):
async def grade_speaking_task(self, task: int, answers: List[Dict]) -> Dict:
pass
@abstractmethod
async def create_videos_and_save_to_db(self, exercises: List[Dict], template: Dict, req_id: str):
pass
@abstractmethod
async def generate_video(
self, part: int, avatar: str, topic: str, questions: list[str],
*,
second_topic: Optional[str] = None,
prompts: Optional[list[str]] = None,
suffix: Optional[str] = None,
):
pass

View File

@@ -1,8 +1,22 @@
from abc import ABC, abstractmethod
from typing import Dict, List
class IVideoGeneratorService(ABC):
def __init__(self, avatars: Dict):
self._avatars = avatars
async def get_avatars(self) -> List[Dict]:
return [
{"name": name, "gender": data["avatar_gender"]}
for name, data in self._avatars.items()
]
@abstractmethod
async def create_video(self, text: str, avatar: str, title: str):
pass
@abstractmethod
async def poll_status(self, video_id: str):
pass

View File

@@ -1,6 +1,12 @@
from asyncio import gather
from typing import Dict, Optional
from uuid import uuid4
from fastapi import UploadFile
import random
from app.configs.constants import EducationalContent
from app.dtos.level import LevelExercisesDTO
from app.repositories.abc import IDocumentStore
from app.services.abc import (
@@ -41,48 +47,61 @@ class LevelService(ILevelService):
)
async def upload_level(self, upload: UploadFile) -> Dict:
return await self._upload_module.generate_level_from_file(upload)
async def upload_level(self, upload: UploadFile, solutions: Optional[UploadFile] = None) -> Dict:
return await self._upload_module.generate_level_from_file(upload, solutions)
async def _generate_exercise(self, req_exercise, start_id):
if req_exercise.type == "mcBlank":
questions = await self._mc.gen_multiple_choice("blank_space", req_exercise.quantity, start_id)
questions["variant"] = "mcBlank"
questions["type"] = "multipleChoice"
questions["prompt"] = "Choose the correct word or group of words that completes the sentences."
return questions
elif req_exercise.type == "mcUnderline":
questions = await self._mc.gen_multiple_choice("underline", req_exercise.quantity, start_id)
questions["variant"] = "mcUnderline"
questions["type"] = "multipleChoice"
questions["prompt"] = "Choose the underlined word or group of words that is not correct."
return questions
elif req_exercise.type == "passageUtas":
topic = req_exercise.topic if req_exercise.topic else random.choice(EducationalContent.TOPICS)
exercise = await self._passage_utas.gen_reading_passage_utas(
start_id,
req_exercise.quantity,
topic,
req_exercise.text_size
)
exercise["prompt"] = "Read the text and answer the questions below."
return exercise
elif req_exercise.type == "fillBlanksMC":
exercise = await self._fill_blanks.gen_fill_blanks(
start_id,
req_exercise.quantity,
req_exercise.text_size,
req_exercise.topic
)
exercise["prompt"] = "Read the text below and choose the correct word for each space."
return exercise
async def generate_exercises(self, dto: LevelExercisesDTO):
exercises = []
start_id = 1
start_ids = []
current_id = 1
for req_exercise in dto.exercises:
if req_exercise.type == "multipleChoice":
questions = await self._mc.gen_multiple_choice("normal", req_exercise.quantity, start_id)
exercises.append(questions)
start_ids.append(current_id)
current_id += req_exercise.quantity
elif req_exercise.type == "mcBlank":
questions = await self._mc.gen_multiple_choice("blank_space", req_exercise.quantity, start_id)
questions["variant"] = "mc"
exercises.append(questions)
tasks = [
self._generate_exercise(req_exercise, start_id)
for req_exercise, start_id in zip(dto.exercises, start_ids)
]
questions = await gather(*tasks)
questions = [{'id': str(uuid4()), **exercise} for exercise in questions]
elif req_exercise.type == "mcUnderline":
questions = await self._mc.gen_multiple_choice("underline", req_exercise.quantity, start_id)
exercises.append(questions)
elif req_exercise.type == "blankSpaceText":
questions = await self._blank_space.gen_blank_space_text_utas(
req_exercise.quantity, start_id, req_exercise.text_size, req_exercise.topic
)
exercises.append(questions)
elif req_exercise.type == "passageUtas":
questions = await self._passage_utas.gen_reading_passage_utas(
start_id, req_exercise.mc_qty, req_exercise.text_size
)
exercises.append(questions)
elif req_exercise.type == "fillBlanksMC":
questions = await self._passage_utas.gen_reading_passage_utas(
start_id, req_exercise.mc_qty, req_exercise.text_size
)
exercises.append(questions)
start_id = start_id + req_exercise.quantity
return exercises
return {"exercises": questions}
# Just here to support other modules that I don't know if they are supposed to still be used
async def gen_multiple_choice(self, mc_variant: str, quantity: int, start_id: int = 1):

View File

@@ -11,11 +11,12 @@ class FillBlanks:
async def gen_fill_blanks(
self, quantity: int, start_id: int, size: int, topic=None
self, start_id: int, quantity: int, size: int = 300, topic=None
):
if not topic:
topic = random.choice(EducationalContent.MTI_TOPICS)
print(quantity)
print(start_id)
messages = [
{
"role": "system",
@@ -28,19 +29,18 @@ class FillBlanks:
{
"role": "user",
"content": (
f'From the generated text choose {quantity} words (cannot be sequential words) to replace '
'once with {{id}} where id starts on ' + str(start_id) + ' and is incremented for each word. '
'The ids must be ordered throughout the text and the words must be replaced only once. '
'For each removed word you will place it in the solutions array and assign a letter from A to D,'
' then you will place that removed word and the chosen letter on the words array along with '
' other 3 other words for the remaining letter. This is a fill blanks question for an english '
'exam, so don\'t choose words completely at random.'
f'From the generated text choose exactly {quantity} words (cannot be sequential words) replace '
'each with {{id}} (starting from ' + str(start_id) + ' and incrementing), then generate a '
'JSON object containing: the modified text, a solutions array with each word\'s correct '
'letter (A-D), and a words array containing each id with four options where one is '
'the original word (matching the solution) and three are plausible but incorrect '
'alternatives that maintain grammatical consistency. '
'You cannot use repeated words!' #TODO: Solve this after
)
}
]
question = await self._llm.prediction(
GPTModels.GPT_4_O, messages, ["question"], TemperatureSettings.GEN_QUESTION_TEMPERATURE
GPTModels.GPT_4_O, messages, [], TemperatureSettings.GEN_QUESTION_TEMPERATURE
)
return {
**question,
@@ -56,7 +56,7 @@ class FillBlanks:
"solutions": [
{
"id": "",
"solution": ""
"solution": "<A,B,C or D>"
}
],
"words": [

View File

@@ -13,15 +13,12 @@ class PassageUtas:
self._mc_variants = mc_variants
async def gen_reading_passage_utas(
self, start_id, mc_quantity: int, topic: Optional[str] # sa_quantity: int,
self, start_id, mc_quantity: int, topic: Optional[str], word_size: Optional[int] # sa_quantity: int,
):
passage = await self._reading_service.generate_reading_passage(1, topic)
passage = await self._reading_service.generate_reading_passage(1, topic, word_size)
mc_exercises = await self._gen_text_multiple_choice_utas(passage["text"], start_id, mc_quantity)
#short_answer = await self._gen_short_answer_utas(passage["text"], start_id, sa_quantity)
# + sa_quantity, mc_quantity)
mc_exercises["type"] = "multipleChoice"
"""
exercises: {
"shortAnswer": short_answer,
@@ -29,11 +26,12 @@ class PassageUtas:
},
"""
return {
"exercises": mc_exercises,
"text": {
**mc_exercises,
"passage": {
"content": passage["text"],
"title": passage["title"]
}
},
"mcVariant": "passageUtas"
}
async def _gen_short_answer_utas(self, text: str, start_id: int, sa_quantity: int):

View File

@@ -2,7 +2,7 @@ import aiofiles
import os
from logging import getLogger
from typing import Dict, Any, Coroutine
from typing import Dict, Any, Coroutine, Optional
import pdfplumber
from fastapi import UploadFile
@@ -21,7 +21,7 @@ class UploadLevelModule:
self._logger = getLogger(__name__)
self._llm = openai
async def generate_level_from_file(self, file: UploadFile) -> Dict[str, Any] | None:
async def generate_level_from_file(self, file: UploadFile, solutions: Optional[UploadFile]) -> Dict[str, Any] | None:
ext, path_id = await FileHelper.save_upload(file)
FileHelper.convert_file_to_pdf(
f'./tmp/{path_id}/upload.{ext}', f'./tmp/{path_id}/exercises.pdf'

View File

@@ -1,21 +1,19 @@
import queue
import uuid
import asyncio
from logging import getLogger
from queue import Queue
import random
from typing import Dict, List
from typing import Dict, Any
from starlette.datastructures import UploadFile
from app.dtos.listening import GenerateListeningExercises, Dialog
from app.dtos.listening import GenerateListeningExercises, Dialog, ListeningExercises
from app.repositories.abc import IFileStorage, IDocumentStore
from app.services.abc import IListeningService, ILLMService, ITextToSpeechService, ISpeechToTextService
from app.configs.question_templates import getListeningTemplate, getListeningPartTemplate
from app.configs.constants import (
NeuralVoices, GPTModels, TemperatureSettings, FilePaths, MinTimers, ExamVariant, EducationalContent,
NeuralVoices, GPTModels, TemperatureSettings, EducationalContent,
FieldsAndExercises
)
from app.helpers import ExercisesHelper, FileHelper
from app.helpers import FileHelper
from .import_listening import ImportListeningModule
from .multiple_choice import MultipleChoice
from .write_blank_forms import WriteBlankForms
from .write_blanks import WriteBlanks
@@ -49,6 +47,7 @@ class ListeningService(IListeningService):
self._write_blanks = WriteBlanks(llm)
self._write_blanks_forms = WriteBlankForms(llm)
self._write_blanks_notes = WriteBlankNotes(llm)
self._import = ImportListeningModule(llm)
self._sections = {
"section_1": {
"topic": EducationalContent.TWO_PEOPLE_SCENARIOS,
@@ -84,6 +83,12 @@ class ListeningService(IListeningService):
}
}
async def import_exam(
self, exercises: UploadFile, solutions: UploadFile = None
) -> Dict[str, Any] | None:
return await self._import.import_from_file(exercises, solutions)
async def generate_listening_dialog(self, section: int, topic: str, difficulty: str):
return await self._sections[f'section_{section}']["generate_dialogue"](section, topic)
@@ -92,83 +97,63 @@ class ListeningService(IListeningService):
dialog = await self._stt.speech_to_text(f'./tmp/{path_id}/upload.{ext}')
FileHelper.remove_directory(f'./tmp/{path_id}')
async def get_listening_question(self, section: int, dto: GenerateListeningExercises):
dialog_type = self._sections[f'section_{section}']["type"]
exercises = []
start_id = 1
for req_exercise in dto.exercises:
if req_exercise.type == "multipleChoice" or req_exercise.type == "multipleChoice3Options":
n_options = 4 if "multipleChoice" else 3
question = await self._multiple_choice.gen_multiple_choice(
dialog_type, dto.text, req_exercise.quantity, start_id, dto.difficulty, n_options
)
exercises.append(question)
self._logger.info(f"Added multiple choice: {question}")
elif req_exercise.type == "writeBlanksQuestions":
question = await self._write_blanks.gen_write_blanks_questions(
dialog_type, dto.text, req_exercise.quantity, start_id, dto.difficulty
)
question["variant"] = "questions"
exercises.append(question)
self._logger.info(f"Added write blanks questions: {question}")
elif req_exercise.type == "writeBlanksFill":
question = await self._write_blanks_notes.gen_write_blanks_notes(
dialog_type, dto.text, req_exercise.quantity, start_id, dto.difficulty
)
question["variant"] = "fill"
exercises.append(question)
self._logger.info(f"Added write blanks notes: {question}")
elif req_exercise.type == "writeBlanksForm":
question = await self._write_blanks_forms.gen_write_blanks_form(
dialog_type, dto.text, req_exercise.quantity, start_id, dto.difficulty
)
question["variant"] = "form"
exercises.append(question)
self._logger.info(f"Added write blanks form: {question}")
start_id = start_id + req_exercise.quantity
return {"exercises": exercises}
async def generate_mp3(self, dto: Dialog) -> bytes:
return await self._tts.text_to_speech(dto)
async def save_listening(self, parts: list[dict], min_timer: int, difficulty: str, listening_id: str):
template = getListeningTemplate()
template['difficulty'] = difficulty
for i, part in enumerate(parts, start=0):
part_template = getListeningPartTemplate()
async def get_listening_question(self, section: int, dto: GenerateListeningExercises):
dialog_type = self._sections[f'section_{section}']["type"]
start_id = 1
exercise_tasks = []
file_name = str(uuid.uuid4()) + ".mp3"
sound_file_path = FilePaths.AUDIO_FILES_PATH + file_name
firebase_file_path = FilePaths.FIREBASE_LISTENING_AUDIO_FILES_PATH + file_name
if "conversation" in part["text"]:
await self._tts.text_to_speech(part["text"]["conversation"], sound_file_path)
else:
await self._tts.text_to_speech(part["text"], sound_file_path)
file_url = await self._file_storage.upload_file_firebase_get_url(firebase_file_path, sound_file_path)
for req_exercise in dto.exercises:
exercise_tasks.append(
self._generate_exercise(
req_exercise,
dialog_type,
dto.text,
start_id,
dto.difficulty
)
)
start_id += req_exercise.quantity
part_template["audio"]["source"] = file_url
part_template["exercises"] = part["exercises"]
return {"exercises": await asyncio.gather(*exercise_tasks) }
template['parts'].append(part_template)
async def _generate_exercise(
self, req_exercise: ListeningExercises, dialog_type: str, text: str, start_id: int, difficulty: str
):
if req_exercise.type == "multipleChoice" or req_exercise.type == "multipleChoice3Options":
n_options = 4 if req_exercise.type == "multipleChoice" else 3
question = await self._multiple_choice.gen_multiple_choice(
dialog_type, text, req_exercise.quantity, start_id, difficulty, n_options
)
self._logger.info(f"Added multiple choice: {question}")
return question
if min_timer != MinTimers.LISTENING_MIN_TIMER_DEFAULT:
template["minTimer"] = min_timer
template["variant"] = ExamVariant.PARTIAL.value
else:
template["variant"] = ExamVariant.FULL.value
elif req_exercise.type == "writeBlanksQuestions":
question = await self._write_blanks.gen_write_blanks_questions(
dialog_type, text, req_exercise.quantity, start_id, difficulty
)
question["variant"] = "questions"
self._logger.info(f"Added write blanks questions: {question}")
return question
elif req_exercise.type == "writeBlanksFill":
question = await self._write_blanks_notes.gen_write_blanks_notes(
dialog_type, text, req_exercise.quantity, start_id, difficulty
)
question["variant"] = "fill"
self._logger.info(f"Added write blanks notes: {question}")
return question
elif req_exercise.type == "writeBlanksForm":
question = await self._write_blanks_forms.gen_write_blanks_form(
dialog_type, text, req_exercise.quantity, start_id, difficulty
)
question["variant"] = "form"
self._logger.info(f"Added write blanks form: {question}")
return question
listening_id = await self._document_store.save_to_db("listening", template, listening_id)
if listening_id:
return {**template, "id": listening_id}
else:
raise Exception("Failed to save question: " + str(parts))
# ==================================================================================================================
# generate_listening_question helpers

View File

@@ -0,0 +1,180 @@
from logging import getLogger
from typing import Dict, Any
from uuid import uuid4
import aiofiles
from fastapi import UploadFile
from app.dtos.exams.listening import ListeningExam
from app.helpers import FileHelper
from app.mappers.listening import ListeningMapper
from app.services.abc import ILLMService
class ImportListeningModule:
def __init__(self, llm_service: ILLMService):
self._logger = getLogger(__name__)
self._llm = llm_service
async def import_from_file(
self,
exercises: UploadFile,
audio: UploadFile,
solutions: UploadFile = None
) -> Dict[str, Any] | None:
path_id = str(uuid4())
ext, _ = await FileHelper.save_upload(exercises, "exercises", path_id)
FileHelper.convert_file_to_html(
f'./tmp/{path_id}/exercises.{ext}',
f'./tmp/{path_id}/exercises.html'
)
if solutions:
ext, _ = await FileHelper.save_upload(solutions, "solutions", path_id)
FileHelper.convert_file_to_html(
f'./tmp/{path_id}/solutions.{ext}',
f'./tmp/{path_id}/solutions.html'
)
response = await self._get_listening_sections(path_id, solutions is not None)
FileHelper.remove_directory(f'./tmp/{path_id}')
if response:
return response.model_dump(exclude_none=True)
return None
async def _get_listening_sections(
self,
path_id: str,
has_solutions: bool = False
) -> ListeningExam:
async with aiofiles.open(
f'./tmp/{path_id}/exercises.html', 'r', encoding='utf-8'
) as f:
exercises_html = await f.read()
messages = [
self._instructions(has_solutions),
{
"role": "user",
"content": f"Listening exercise sheet:\n\n{exercises_html}"
}
]
if has_solutions:
async with aiofiles.open(
f'./tmp/{path_id}/solutions.html', 'r', encoding='utf-8'
) as f:
solutions_html = await f.read()
messages.append({
"role": "user",
"content": f"Solutions:\n\n{solutions_html}"
})
return await self._llm.pydantic_prediction(
messages,
ListeningMapper.map_to_test_model,
str(self._listening_json_schema())
)
@staticmethod
def _multiple_choice_template() -> dict:
return {
"type": "multipleChoice",
"prompt": "<general instructions for this section>",
"questions": [
{
"id": "<question number as string>",
"prompt": "<question text>",
"options": [
{
"id": "<A/B/C/D>",
"text": "<option text>"
}
],
"solution": "<correct option letter>",
"variant": "text"
}
]
}
@staticmethod
def _write_blanks_questions_template() -> dict:
return {
"type": "writeBlanks",
"maxWords": "<number>",
"prompt": "<instructions>",
"text": "<questions separated by newlines '\n' and blanks {{id}} in them the blanks can only occur at the end of sentence>",
"solutions": [
{
"id": "<question number as string>",
"solution": ["<acceptable answer(s)>"]
}
],
"variant": "questions"
}
@staticmethod
def _write_blanks_fill_template() -> dict:
return {
"type": "writeBlanks",
"maxWords": "<number>",
"prompt": "<instructions>",
"text": "<A summary with blanks denoted by {{id}}>",
"solutions": [
{
"id": "<blank number as string inside {{}}>",
"solution": ["<correct word>"]
}
],
"variant": "fill"
}
@staticmethod
def _write_blanks_form_template() -> dict:
return {
"type": "writeBlanks",
"maxWords": "<number>",
"prompt": "<instructions>",
"text": "<questions separated by newlines '\n' and blanks {{id}} in them the blanks can happen mid text>",
"solutions": [
{
"id": "<blank number as string inside {{}}>",
"solution": ["<correct word>"]
}
],
"variant": "form"
}
def _instructions(self, has_solutions: bool = False) -> Dict[str, str]:
solutions_str = " and its solutions" if has_solutions else ""
return {
"role": "system",
"content": (
f"You are processing a listening test exercise sheet{solutions_str}. "
"Structure each exercise exactly according to these json templates:\n\n"
f"1. Multiple Choice Questions:\n{self._multiple_choice_template()}\n\n"
f"2. Write Blanks - Questions format:\n{self._write_blanks_questions_template()}\n\n"
f"3. Write Blanks - Fill format:\n{self._write_blanks_fill_template()}\n\n"
f"4. Write Blanks - Form format:\n{self._write_blanks_form_template()}\n\n"
"\nImportant rules:\n"
"1. Keep exact question numbering from the original\n"
"2. Include all options for multiple choice questions\n"
"3. Mark blanks with {{id}} where id is the question number\n"
"4. Set maxWords according to the instructions\n"
"5. Include all possible correct answers in solution arrays\n"
"6. Maintain exact spacing and formatting from templates\n"
"7. Use appropriate variant for writeBlanks (questions/fill/form)\n"
"8. For text fields, use actual newlines between questions/sentences\n"
)
}
def _listening_json_schema(self) -> Dict[str, Any]:
return {
"exercises": [
self._multiple_choice_template(),
self._write_blanks_questions_template(),
self._write_blanks_fill_template(),
self._write_blanks_form_template()
]
}

View File

@@ -1,3 +1,4 @@
import asyncio
from logging import getLogger
from fastapi import UploadFile
@@ -77,55 +78,63 @@ class ReadingService(IReadingService):
TemperatureSettings.GEN_QUESTION_TEMPERATURE
)
async def _generate_single_exercise(self, req_exercise, text: str, start_id: int, difficulty: str) -> dict:
if req_exercise.type == "fillBlanks":
question = await self._fill_blanks.gen_summary_fill_blanks_exercise(
text, req_exercise.quantity, start_id, difficulty, req_exercise.num_random_words
)
self._logger.info(f"Added fill blanks: {question}")
return question
elif req_exercise.type == "trueFalse":
question = await self._true_false.gen_true_false_not_given_exercise(
text, req_exercise.quantity, start_id, difficulty
)
self._logger.info(f"Added trueFalse: {question}")
return question
elif req_exercise.type == "writeBlanks":
question = await self._write_blanks.gen_write_blanks_exercise(
text, req_exercise.quantity, start_id, difficulty, req_exercise.max_words
)
if ExercisesHelper.answer_word_limit_ok(question):
self._logger.info(f"Added write blanks: {question}")
return question
else:
self._logger.info("Did not add write blanks because it did not respect word limit")
return {}
elif req_exercise.type == "paragraphMatch":
question = await self._paragraph_match.gen_paragraph_match_exercise(
text, req_exercise.quantity, start_id
)
self._logger.info(f"Added paragraph match: {question}")
return question
elif req_exercise.type == "ideaMatch":
question = await self._idea_match.gen_idea_match_exercise(
text, req_exercise.quantity, start_id
)
question["variant"] = "ideaMatch"
self._logger.info(f"Added idea match: {question}")
return question
async def generate_reading_exercises(self, dto: ReadingDTO):
exercises = []
exercise_tasks = []
start_id = 1
for req_exercise in dto.exercises:
if req_exercise.type == "fillBlanks":
question = await self._fill_blanks.gen_summary_fill_blanks_exercise(
dto.text, req_exercise.quantity, start_id, dto.difficulty, req_exercise.num_random_words
exercise_tasks.append(
self._generate_single_exercise(
req_exercise,
dto.text,
start_id,
dto.difficulty
)
exercises.append(question)
self._logger.info(f"Added fill blanks: {question}")
elif req_exercise.type == "trueFalse":
question = await self._true_false.gen_true_false_not_given_exercise(
dto.text, req_exercise.quantity, start_id, dto.difficulty
)
exercises.append(question)
self._logger.info(f"Added trueFalse: {question}")
elif req_exercise.type == "writeBlanks":
question = await self._write_blanks.gen_write_blanks_exercise(
dto.text, req_exercise.quantity, start_id, dto.difficulty, req_exercise.max_words
)
if ExercisesHelper.answer_word_limit_ok(question):
exercises.append(question)
self._logger.info(f"Added write blanks: {question}")
else:
exercises.append({})
self._logger.info("Did not add write blanks because it did not respect word limit")
elif req_exercise.type == "paragraphMatch":
question = await self._paragraph_match.gen_paragraph_match_exercise(
dto.text, req_exercise.quantity, start_id
)
exercises.append(question)
self._logger.info(f"Added paragraph match: {question}")
elif req_exercise.type == "ideaMatch":
question = await self._idea_match.gen_idea_match_exercise(
dto.text, req_exercise.quantity, start_id
)
question["variant"] = "ideaMatch"
exercises.append(question)
self._logger.info(f"Added idea match: {question}")
start_id = start_id + req_exercise.quantity
)
start_id += req_exercise.quantity
return {
"exercises": exercises
"exercises": await asyncio.gather(*exercise_tasks)
}

View File

@@ -39,7 +39,7 @@ class ImportReadingModule:
exercises_html = await f.read()
messages = [
self._instructions(),
self._instructions(solutions),
{
"role": "user",
"content": f"Exam question sheet:\n\n{exercises_html}"
@@ -66,18 +66,20 @@ class ImportReadingModule:
self._write_blanks(),
self._fill_blanks(),
self._match_sentences(),
self._true_false()
self._true_false(),
self._multiple_choice()
]
return json
@staticmethod
def _reading_exam_template():
return {
"minTimer": "<number of minutes as int not string>",
"minTimer": "<integer representing minutes allowed for the exam>",
"parts": [
{
"text": {
"title": "<title of the passage>",
"content": "<the text of the passage>",
"title": "<title of the reading passage>",
"content": "<full text content of the reading passage>",
},
"exercises": []
}
@@ -87,17 +89,18 @@ class ImportReadingModule:
@staticmethod
def _write_blanks():
return {
"maxWords": "<number of max words return the int value not string>",
"maxWords": "<integer max words allowed per answer>",
"solutions": [
{
"id": "<number of the question as string>",
"id": "<question number as string>",
"solution": [
"<at least one solution can have alternative solutions (that dont exceed maxWords)>"
"<acceptable answer(s) within maxWords limit>"
]
},
}
],
"text": "<all the questions formatted in this way: <question>{{<id>}}\\n<question2>{{<id2>}}\\n >",
"type": "writeBlanks"
"text": "<numbered questions with format: <question text>{{<question number>}}\\n>",
"type": "writeBlanks",
"prompt": "<specific instructions for this exercise section>"
}
@staticmethod
@@ -105,19 +108,20 @@ class ImportReadingModule:
return {
"options": [
{
"id": "<uppercase letter that identifies a paragraph>",
"sentence": "<either a heading or an idea>"
"id": "<paragraph letter A-F>",
"sentence": "<THIS NEEDS TO BE A PARAGRAPH OF THE SECTION TEXT>"
}
],
"sentences": [
{
"id": "<the question id not the option id>",
"solution": "<id in options>",
"sentence": "<heading or an idea>",
"id": "<question number as string>",
"solution": "<matching paragraph letter>",
"sentence": "<A SHORT SENTENCE THAT CONVEYS AND IDEA OR HEADING>"
}
],
"type": "matchSentences",
"variant": "<heading OR ideaMatch (try to figure it out via the exercises instructions)>"
"variant": "<heading OR ideaMatch (try to figure it out via the exercises instructions)>",
"prompt": "<specific instructions for this exercise section>"
}
@staticmethod
@@ -125,12 +129,34 @@ class ImportReadingModule:
return {
"questions": [
{
"prompt": "<question>",
"solution": "<can only be one of these [\"true\", \"false\", \"not_given\"]>",
"id": "<the question id>"
"id": "<question number>",
"prompt": "<statement to evaluate>",
"solution": "<one of: true, false, not_given>",
}
],
"type": "trueFalse"
"type": "trueFalse",
"prompt": "<specific instructions including T/F/NG marking scheme>"
}
@staticmethod
def _multiple_choice():
return {
"questions": [
{
"id": "<question number>",
"prompt": "<question text>",
"options": [
{
"id": "<A, B, or C>",
"text": "<option text>"
}
],
"solution": "<correct option letter>",
"variant": "text"
}
],
"type": "multipleChoice",
"prompt": "<specific instructions for this exercise section>"
}
@staticmethod
@@ -138,53 +164,69 @@ class ImportReadingModule:
return {
"solutions": [
{
"id": "<blank id>",
"solution": "<word>"
"id": "<blank number>",
"solution": "<correct word>"
}
],
"text": "<section of text with blanks denoted by {{<blank id>}}>",
"text": "<text passage with blanks marked as {{<blank number>}}>",
"type": "fillBlanks",
"words": [
{
"letter": "<uppercase letter that ids the words (may not be included and if not start at A)>",
"word": "<word>"
"letter": "<word identifier letter>",
"word": "<word from word bank>"
}
]
],
"prompt": "<specific instructions for this exercise section>"
}
def _instructions(self, solutions = False):
def _instructions(self, solutions=False):
solutions_str = " and its solutions" if solutions else ""
tail = (
"The solutions were not supplied so you will have to solve them. Do your utmost to get all the information and"
"all the solutions right!"
if not solutions else
"Do your utmost to correctly identify the sections, its exercises and respective solutions"
"Parse the exam carefully and identify:\n"
"1. Time limit from instructions\n"
"2. Reading passage title and full content\n"
"3. All exercise sections and their specific instructions\n"
"4. Question numbering and grouping\n"
"5. Word limits and formatting requirements\n"
"6. Specific marking schemes (e.g., T/F/NG)\n\n"
+ (
"Solutions were not provided - analyze the passage carefully to determine correct answers."
if not solutions else
"Use the provided solutions to fill in all answer fields accurately."
)
+
"Pay extra attention to fillblanks exercises the solution and option wording must match in case!"
"There can't be options in lowercase and solutions in uppercase!"
"Also PAY ATTENTION TO SECTIONS, these most likely indicate parts, and in each section/part there "
"should be a text, if there isn't a title for it choose a reasonable one based on its contents."
)
return {
"role": "system",
"content": (
f"You will receive html pertaining to an english exam question sheet{solutions_str}. Your job is to "
f"structure the data into a single json with this template: {self._reading_exam_template()}\n"
f"You are processing an English reading comprehension exam{solutions_str}. Structure the data according "
f"to this json template: {self._reading_exam_template()}\n\n"
"You will need find out how many parts the exam has a correctly place its exercises. You will "
"encounter 4 types of exercises:\n"
" - \"writeBlanks\": short answer questions that have a answer word limit, generally two or three\n"
" - \"matchSentences\": a sentence needs to be matched with a paragraph\n"
" - \"trueFalse\": questions that its answers can only be true false or not given\n"
" - \"fillBlanks\": a text that has blank spaces on a section of text and a word bank which "
"contains the solutions and sometimes random words to throw off the students\n"
"These 4 types of exercises will need to be placed in the correct json template inside each part, "
"the templates are as follows:\n "
"The exam contains these exercise types:\n"
"1. \"writeBlanks\": Short answer questions with strict word limits\n"
"2. \"matchSentences\": Match headings or ideas with paragraphs, the sentences field\n"
"3. \"trueFalse\": Evaluate statements as True/False/Not Given\n"
"4. \"fillBlanks\": Complete text using provided word bank\n"
"5. \"multipleChoice\": Select correct option from choices\n\n"
"Exercise templates:\n"
f"writeBlanks: {self._write_blanks()}\n"
f"matchSentences: {self._match_sentences()}\n"
f"trueFalse: {self._true_false()}\n"
f"fillBlanks: {self._fill_blanks()}\n\n"
f"fillBlanks: {self._fill_blanks()}\n"
f"multipleChoice: {self._multiple_choice()}\n\n"
"Important details to capture:\n"
"- Exercise section instructions and constraints\n"
"- Question numbering and grouping\n"
"- Word limits and formatting requirements\n"
"- Marking schemes and answer formats\n\n"
f"{tail}"
)
}

View File

@@ -7,7 +7,7 @@ from typing import Dict, List, Optional
from app.configs.constants import (
FieldsAndExercises, GPTModels, TemperatureSettings,
ELAIAvatars, FilePaths
FilePaths
)
from app.helpers import TextHelper
from app.repositories.abc import IFileStorage, IDocumentStore
@@ -17,14 +17,12 @@ from app.services.abc import ISpeakingService, ILLMService, IVideoGeneratorServi
class SpeakingService(ISpeakingService):
def __init__(
self, llm: ILLMService, vid_gen: IVideoGeneratorService,
file_storage: IFileStorage, document_store: IDocumentStore,
self, llm: ILLMService,
file_storage: IFileStorage,
stt: ISpeechToTextService
):
self._llm = llm
self._vid_gen = vid_gen
self._file_storage = file_storage
self._document_store = document_store
self._stt = stt
self._logger = logging.getLogger(__name__)
@@ -102,7 +100,7 @@ class SpeakingService(ISpeakingService):
}
async def get_speaking_part(
self, part: int, topic: str, difficulty: str, second_topic: Optional[str] = None
self, part: int, topic: str, second_topic: str, difficulty: str
) -> Dict:
task_values = self._tasks[f'task_{part}']['get']
@@ -416,193 +414,6 @@ class SpeakingService(ISpeakingService):
)
return response["fixed_text"]
async def create_videos_and_save_to_db(self, exercises, template, req_id):
template = await self._create_video_per_part(exercises, template, 1, req_id)
template = await self._create_video_per_part(exercises, template, 2, req_id)
template = await self._create_video_per_part(exercises, template, 3, req_id)
await self._document_store.save_to_db_with_id("speaking", template, req_id)
self._logger.info(f'Saved speaking to DB with id {req_id} : {str(template)}')
async def _create_video_per_part(self, exercises: List[Dict], template: Dict, part: int, req_id: str):
avatar = (random.choice(list(ELAIAvatars))).name
template_index = part - 1
# Using list comprehension to find the element with the desired value in the 'type' field
found_exercises = [element for element in exercises if element.get('type') == part]
# Check if any elements were found
if found_exercises:
exercise = found_exercises[0]
self._logger.info(f'Creating video for speaking part {part}')
if part in {1, 3}:
questions = []
for question in exercise["questions"]:
result = await self._create_video(
question,
avatar,
f'Failed to create video for part {part} question: {str(exercise["question"])}',
req_id
)
if result is not None:
video = {
"text": question,
"video_path": result["video_path"],
"video_url": result["video_url"]
}
questions.append(video)
template["exercises"][template_index]["prompts"] = questions
if part == 1:
template["exercises"][template_index]["first_title"] = exercise["first_topic"]
template["exercises"][template_index]["second_title"] = exercise["second_topic"]
else:
template["exercises"][template_index]["title"] = exercise["topic"]
else:
result = await self._create_video(
exercise["question"],
avatar,
f'Failed to create video for part {part} question: {str(exercise["question"])}',
req_id
)
if result is not None:
template["exercises"][template_index]["prompts"] = exercise["prompts"]
template["exercises"][template_index]["text"] = exercise["question"]
template["exercises"][template_index]["title"] = exercise["topic"]
template["exercises"][template_index]["video_url"] = result["video_url"]
template["exercises"][template_index]["video_path"] = result["video_path"]
if not found_exercises:
template["exercises"].pop(template_index)
return template
async def generate_video(
self, part: int, avatar: str, topic: str, questions: list[str],
*,
second_topic: Optional[str] = None,
prompts: Optional[list[str]] = None,
suffix: Optional[str] = None,
):
params = locals()
params.pop('self')
request_id = str(uuid.uuid4())
self._logger.info(
f'POST - generate_video_{part} - Received request to generate video {part}. '
f'Use this id to track the logs: {request_id} - Request data: " + {params}'
)
part_questions = self._get_part_questions(part, questions, avatar)
videos = []
self._logger.info(f'POST - generate_video_{part} - {request_id} - Creating videos for speaking part {part}.')
for question in part_questions:
self._logger.info(f'POST - generate_video_{part} - {request_id} - Creating video for question: {question}')
result = await self._create_video(
question,
avatar,
'POST - generate_video_{p} - {r} - Failed to create video for part {p} question: {q}'.format(
p=part, r=request_id, q=question
),
request_id
)
if result is not None:
self._logger.info(f'POST - generate_video_{part} - {request_id} - Video created')
self._logger.info(
f'POST - generate_video_{part} - {request_id} - Uploaded video to firebase: {result["video_url"]}'
)
video = {
"text": question,
"video_path": result["video_path"],
"video_url": result["video_url"]
}
videos.append(video)
if part == 2 and len(videos) == 0:
raise Exception(f'Failed to create video for part 2 question: {questions[0]}')
return self._get_part_response(part, topic, videos, second_topic, prompts, suffix)
@staticmethod
def _get_part_questions(part: int, questions: list[str], avatar: str):
part_questions: list[str] = []
if part == 1:
id_to_name = {
"5912afa7c77c47d3883af3d874047aaf": "MATTHEW",
"9e58d96a383e4568a7f1e49df549e0e4": "VERA",
"d2cdd9c0379a4d06ae2afb6e5039bd0c": "EDWARD",
"045cb5dcd00042b3a1e4f3bc1c12176b": "TANYA",
"1ae1e5396cc444bfad332155fdb7a934": "KAYLA",
"0ee6aa7cc1084063a630ae514fccaa31": "JEROME",
"5772cff935844516ad7eeff21f839e43": "TYLER",
}
part_questions.extend(
[
"Hello my name is " + id_to_name.get(avatar) + ", what is yours?",
"Do you work or do you study?",
*questions
]
)
elif part == 2:
# Removed as the examiner should not say what is on the card.
# question = question + " In your answer you should consider: " + " ".join(prompts) + suffix
part_questions.append(f'{questions[0]}\nYou have 1 minute to take notes.')
elif part == 3:
part_questions = questions
return part_questions
@staticmethod
def _get_part_response(
part: int,
topic: str,
videos: list[dict],
second_topic: Optional[str],
prompts: Optional[list[str]],
suffix: Optional[str]
):
response = {}
if part == 1:
response = {
"prompts": videos,
"first_title": topic,
"second_title": second_topic,
"type": "interactiveSpeaking"
}
if part == 2:
response = {
"prompts": prompts,
"title": topic,
"suffix": suffix,
"type": "speaking",
# includes text, video_url and video_path
**videos[0]
}
if part == 3:
response = {
"prompts": videos,
"title": topic,
"type": "interactiveSpeaking",
}
response["id"] = str(uuid.uuid4())
return response
async def _create_video(self, question: str, avatar: str, error_message: str, title: str):
result = await self._vid_gen.create_video(question, avatar, title)
if result is not None:
sound_file_path = FilePaths.VIDEO_FILES_PATH + result
firebase_file_path = FilePaths.FIREBASE_SPEAKING_VIDEO_FILES_PATH + result
url = await self._file_storage.upload_file_firebase_get_url(firebase_file_path, sound_file_path)
return {
"video_path": firebase_file_path,
"video_url": url
}
self._logger.error(error_message)
return None
@staticmethod
def _grade_template():

View File

@@ -1,15 +1,8 @@
import asyncio
import os
import logging
from asyncio import sleep
from copy import deepcopy
import aiofiles
from charset_normalizer.md import getLogger
from logging import getLogger
from httpx import AsyncClient
from app.configs.constants import ELAIAvatars
from app.dtos.video import Task, TaskStatus
from app.services.abc import IVideoGeneratorService
@@ -17,7 +10,9 @@ class ELAI(IVideoGeneratorService):
_ELAI_ENDPOINT = 'https://apis.elai.io/api/v1/videos'
def __init__(self, client: AsyncClient, token: str, conf: dict):
def __init__(self, client: AsyncClient, token: str, avatars: dict, *, conf: dict):
super().__init__(deepcopy(avatars))
self._http_client = client
self._conf = deepcopy(conf)
self._logger = getLogger(__name__)
@@ -31,14 +26,13 @@ class ELAI(IVideoGeneratorService):
"Authorization": f"Bearer {token}"
}
async def create_video(self, text: str, avatar: str):
avatar_url = ELAIAvatars[avatar].value.get("avatar_url")
avatar_code = ELAIAvatars[avatar].value.get("avatar_code")
avatar_gender = ELAIAvatars[avatar].value.get("avatar_gender")
avatar_canvas = ELAIAvatars[avatar].value.get("avatar_canvas")
voice_id = ELAIAvatars[avatar].value.get("voice_id")
voice_provider = ELAIAvatars[avatar].value.get("voice_provider")
avatar_url = self._avatars[avatar].get("avatar_url")
avatar_code = self._avatars[avatar].get("avatar_code")
avatar_gender = self._avatars[avatar].get("avatar_gender")
avatar_canvas = self._avatars[avatar].get("avatar_canvas")
voice_id = self._avatars[avatar].get("voice_id")
voice_provider = self._avatars[avatar].get("voice_provider")
self._conf["slides"][0]["canvas"]["objects"][0]["src"] = avatar_url
self._conf["slides"]["avatar"] = {
@@ -59,37 +53,32 @@ class ELAI(IVideoGeneratorService):
if video_id:
await self._http_client.post(f'{self._ELAI_ENDPOINT}/render/{video_id}', headers=self._GET_HEADER)
return Task(
result=video_id,
status=TaskStatus.STARTED,
)
else:
return Task(status=TaskStatus.ERROR)
while True:
response = await self._http_client.get(f'{self._ELAI_ENDPOINT}/{video_id}', headers=self._GET_HEADER)
response_data = response.json()
async def pool_status(self, video_id: str) -> Task:
response = await self._http_client.get(f'{self._ELAI_ENDPOINT}/{video_id}', headers=self._GET_HEADER)
response_data = response.json()
if response_data['status'] == 'ready':
self._logger.info(response_data)
download_url = response_data.get('url')
output_directory = 'download-video/'
output_filename = video_id + '.mp4'
response = await self._http_client.get(download_url)
if response.status_code == 200:
os.makedirs(output_directory, exist_ok=True)
output_path = os.path.join(output_directory, output_filename)
with open(output_path, 'wb') as f:
f.write(response.content)
self._logger.info(f"File '{output_filename}' downloaded successfully.")
return output_filename
else:
self._logger.error(f"Failed to download file. Status code: {response.status_code}")
return None
elif response_data['status'] == 'failed':
self._logger.error('Video creation failed.')
break
else:
self._logger.info('Video is still processing. Checking again in 10 seconds...')
await sleep(10)
if response_data['status'] == 'ready':
self._logger.info(response_data)
return Task(
status=TaskStatus.COMPLETED,
result=response_data.get('url')
)
elif response_data['status'] == 'failed':
self._logger.error('Video creation failed.')
return Task(
status=TaskStatus.ERROR,
result=response_data.get('url')
)
else:
self._logger.info('Video is still processing.')
return Task(
status=TaskStatus.IN_PROGRESS,
result=video_id
)

View File

@@ -0,0 +1,58 @@
{
"Gia": {
"avatar_code": "gia.business",
"avatar_gender": "female",
"avatar_url": "https://elai-avatars.s3.us-east-2.amazonaws.com/common/gia/business/gia_business.png",
"avatar_canvas": "https://elai-avatars.s3.us-east-2.amazonaws.com/common/gia/business/gia_business.png",
"voice_id": "EXAVITQu4vr4xnSDxMaL",
"voice_provider": "elevenlabs"
},
"Vadim": {
"avatar_code": "vadim.business",
"avatar_gender": "male",
"avatar_url": "https://elai-avatars.s3.us-east-2.amazonaws.com/common/vadim/business/vadim_business.png",
"avatar_canvas": "https://d3u63mhbhkevz8.cloudfront.net/common/vadim/business/vadim_business.png",
"voice_id": "flq6f7yk4E4fJM5XTYuZ",
"voice_provider": "elevenlabs"
},
"Orhan": {
"avatar_code": "orhan.business",
"avatar_gender": "male",
"avatar_url": "https://elai-avatars.s3.us-east-2.amazonaws.com/common/orhan/business/orhan.png",
"avatar_canvas": "https://d3u63mhbhkevz8.cloudfront.net/common/orhan/business/orhan.png",
"voice_id": "en-US-AndrewMultilingualNeural",
"voice_provider": "azure"
},
"Flora": {
"avatar_code": "flora.business",
"avatar_gender": "female",
"avatar_url": "https://elai-avatars.s3.us-east-2.amazonaws.com/common/flora/business/flora_business.png",
"avatar_canvas": "https://d3u63mhbhkevz8.cloudfront.net/common/flora/business/flora_business.png",
"voice_id": "en-US-JaneNeural",
"voice_provider": "azure"
},
"Scarlett": {
"avatar_code": "scarlett.business",
"avatar_gender": "female",
"avatar_url": "https://elai-avatars.s3.us-east-2.amazonaws.com/common/scarlett/business/scarlett_business.png",
"avatar_canvas": "https://d3u63mhbhkevz8.cloudfront.net/common/scarlett/business/scarlett_business.png",
"voice_id": "en-US-NancyNeural",
"voice_provider": "azure"
},
"Parker": {
"avatar_code": "parker.casual",
"avatar_gender": "male",
"avatar_url": "https://elai-avatars.s3.us-east-2.amazonaws.com/common/parker/casual/parker_casual.png",
"avatar_canvas": "https://d3u63mhbhkevz8.cloudfront.net/common/parker/casual/parker_casual.png",
"voice_id": "en-US-TonyNeural",
"voice_provider": "azure"
},
"Ethan": {
"avatar_code": "ethan.business",
"avatar_gender": "male",
"avatar_url": "https://elai-avatars.s3.us-east-2.amazonaws.com/common/ethan/business/ethan_business_low.png",
"avatar_canvas": "https://d3u63mhbhkevz8.cloudfront.net/common/ethan/business/ethan_business_low.png",
"voice_id": "en-US-JasonNeural",
"voice_provider": "azure"
}
}

View File

@@ -1,89 +0,0 @@
import asyncio
import os
import logging
import aiofiles
from httpx import AsyncClient
from app.services.abc import IVideoGeneratorService
class Heygen(IVideoGeneratorService):
_GET_VIDEO_URL = 'https://api.heygen.com/v1/video_status.get'
def __init__(self, client: AsyncClient, token: str):
pass
self._get_header = {
'X-Api-Key': token
}
self._post_header = {
'X-Api-Key': token,
'Content-Type': 'application/json'
}
self._http_client = client
self._logger = logging.getLogger(__name__)
async def create_video(self, text: str, avatar: str, title="video_title"):
pass
# POST TO CREATE VIDEO
create_video_url = 'https://api.heygen.com/v2/template/' + avatar + '/generate'
data = {
"test": False,
"caption": False,
"title": "video_title",
"variables": {
"script_here": {
"name": "script_here",
"type": "text",
"properties": {
"content": text
}
}
}
}
response = await self._http_client.post(create_video_url, headers=self._post_header, json=data)
self._logger.info(response.status_code)
self._logger.info(response.json())
# GET TO CHECK STATUS AND GET VIDEO WHEN READY
video_id = response.json()["data"]["video_id"]
params = {
'video_id': response.json()["data"]["video_id"]
}
response = {}
status = "processing"
error = None
while status != "completed" and error is None:
response = await self._http_client.get(self._GET_VIDEO_URL, headers=self._get_header, params=params)
response_data = response.json()
status = response_data["data"]["status"]
error = response_data["data"]["error"]
if status != "completed" and error is None:
self._logger.info(f"Status: {status}")
await asyncio.sleep(10) # Wait for 10 second before the next request
self._logger.info(response.status_code)
self._logger.info(response.json())
# DOWNLOAD VIDEO
download_url = response.json()['data']['video_url']
output_directory = 'download-video/'
output_filename = video_id + '.mp4'
response = await self._http_client.get(download_url)
if response.status_code == 200:
os.makedirs(output_directory, exist_ok=True) # Create the directory if it doesn't exist
output_path = os.path.join(output_directory, output_filename)
async with aiofiles.open(output_path, 'wb') as f:
await f.write(response.content)
self._logger.info(f"File '{output_filename}' downloaded successfully.")
return output_filename
else:
self._logger.error(f"Failed to download file. Status code: {response.status_code}")
return None

View File

@@ -0,0 +1,87 @@
import asyncio
import os
import logging
import random
from copy import deepcopy
import aiofiles
from httpx import AsyncClient
from app.dtos.video import Task, TaskStatus
from app.services.abc import IVideoGeneratorService
class Heygen(IVideoGeneratorService):
_GET_VIDEO_URL = 'https://api.heygen.com/v1/video_status.get'
def __init__(self, client: AsyncClient, token: str, avatars: dict):
super().__init__(deepcopy(avatars))
self._get_header = {
'X-Api-Key': token
}
self._post_header = {
'X-Api-Key': token,
'Content-Type': 'application/json'
}
self._http_client = client
self._logger = logging.getLogger(__name__)
async def create_video(self, text: str, avatar: str):
avatar = self._avatars[avatar]["id"]
create_video_url = f'https://api.heygen.com/v2/template/{avatar}/generate'
data = {
"test": False,
"caption": False,
"title": "video_title",
"variables": {
"script_here": {
"name": "script_here",
"type": "text",
"properties": {
"content": text
}
}
}
}
response = await self._http_client.post(create_video_url, headers=self._post_header, json=data)
self._logger.info(response.status_code)
self._logger.info(response.json())
video_id = response.json()["data"]["video_id"]
return Task(
result=video_id,
status=TaskStatus.STARTED,
)
async def poll_status(self, video_id: str) -> Task:
response = await self._http_client.get(self._GET_VIDEO_URL, headers=self._get_header, params={
'video_id': video_id
})
response_data = response.json()
status = response_data["data"]["status"]
error = response_data["data"]["error"]
if status != "completed" and error is None:
self._logger.info(f"Status: {status}")
return Task(
status=TaskStatus.IN_PROGRESS,
result=video_id
)
if error:
self._logger.error('Video creation failed.')
return Task(
status=TaskStatus.ERROR,
result=response_data.get('url')
)
url = response.json()['data']['video_url']
self._logger.info(f'Successfully generated video: {url}')
return Task(
status=TaskStatus.COMPLETED,
result=url
)

View File

@@ -0,0 +1,30 @@
{
"Matthew Noah": {
"id": "5912afa7c77c47d3883af3d874047aaf",
"avatar_gender": "male"
},
"Vera Cerise": {
"id": "9e58d96a383e4568a7f1e49df549e0e4",
"avatar_gender": "female"
},
"Edward Tony": {
"id": "d2cdd9c0379a4d06ae2afb6e5039bd0c",
"avatar_gender": "male"
},
"Tanya Molly": {
"id": "045cb5dcd00042b3a1e4f3bc1c12176b",
"avatar_gender": "female"
},
"Kayla Abbi": {
"id": "1ae1e5396cc444bfad332155fdb7a934",
"avatar_gender": "female"
},
"Jerome Ryan": {
"id": "0ee6aa7cc1084063a630ae514fccaa31",
"avatar_gender": "male"
},
"Tyler Christopher": {
"id": "5772cff935844516ad7eeff21f839e43",
"avatar_gender": "male"
}
}

View File

@@ -2,6 +2,7 @@ import json
import re
import logging
from typing import List, Optional, Callable, TypeVar
from openai import AsyncOpenAI
from openai.types.chat import ChatCompletionMessageParam
@@ -73,7 +74,6 @@ class OpenAI(ILLMService):
return await self._prediction(
model, messages, token_count, fields_to_check, temperature, (try_count + 1), check_blacklisted
)
return json.loads(result)
async def prediction_override(self, **kwargs):
@@ -123,7 +123,9 @@ class OpenAI(ILLMService):
while attempt < 3:
result = await self._client.chat.completions.create(**params)
result_content = result.choices[0].message.content
try:
print(result_content)
result_json = json.loads(result_content)
return map_to_model(result_json)
except Exception as e:

View File