Compare commits
33 Commits
new-custom
...
release/mo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
676f660f3e | ||
|
|
6cb7c07f57 | ||
|
|
a328f01d2e | ||
|
|
a931c5ec2e | ||
|
|
bfc9565e85 | ||
|
|
3d70bcbfd1 | ||
|
|
a2cfa335d7 | ||
|
|
0427d6e1b4 | ||
|
|
31c6ed570a | ||
|
|
3a27c42a69 | ||
|
|
260dba1ee6 | ||
|
|
a88d6bb568 | ||
|
|
f0f904f2e4 | ||
|
|
a23bbe581a | ||
|
|
bb26282d25 | ||
|
|
73c29cda25 | ||
|
|
aaa3361575 | ||
|
|
94a16b636d | ||
|
|
cffec795a7 | ||
|
|
b2b4dfb74e | ||
|
|
2716f52a0a | ||
|
|
4099d99f80 | ||
|
|
ab4db36445 | ||
|
|
59f047afba | ||
|
|
09b57cb346 | ||
|
|
bfc3e3f083 | ||
|
|
7b5e10fd79 | ||
|
|
a2a160f61b | ||
|
|
5d5cd21e1e | ||
|
|
06a8384f42 | ||
|
|
dd74a3d259 | ||
|
|
efff0b904e | ||
|
|
03f5b7d72c |
@@ -5,3 +5,4 @@ README.md
|
||||
*.pyd
|
||||
__pycache__
|
||||
.pytest_cache
|
||||
/scripts
|
||||
|
||||
6
.env
6
.env
@@ -1,6 +0,0 @@
|
||||
OPENAI_API_KEY=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
|
||||
JWT_SECRET_KEY=6e9c124ba92e8814719dcb0f21200c8aa4d0f119a994ac5e06eb90a366c83ab2
|
||||
JWT_TEST_TOKEN=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ0ZXN0In0.Emrs2D3BmMP4b3zMjw0fJTPeyMwWEBDbxx2vvaWguO0
|
||||
GOOGLE_APPLICATION_CREDENTIALS=firebase-configs/storied-phalanx-349916.json
|
||||
HEY_GEN_TOKEN=MjY4MDE0MjdjZmNhNDFmYTlhZGRkNmI3MGFlMzYwZDItMTY5NTExNzY3MA==
|
||||
GPT_ZERO_API_KEY=0195b9bb24c5439899f71230809c74af
|
||||
1
.gitignore
vendored
1
.gitignore
vendored
@@ -3,3 +3,4 @@ __pycache__
|
||||
.env
|
||||
.DS_Store
|
||||
/firebase-configs/test_firebase.json
|
||||
/scripts
|
||||
|
||||
3
.idea/ielts-be.iml
generated
3
.idea/ielts-be.iml
generated
@@ -7,6 +7,9 @@
|
||||
<orderEntry type="jdk" jdkName="Python 3.11 (ielts-be)" jdkType="Python SDK" />
|
||||
<orderEntry type="sourceFolder" forTests="false" />
|
||||
</component>
|
||||
<component name="PackageRequirementsSettings">
|
||||
<option name="versionSpecifier" value="Don't specify version" />
|
||||
</component>
|
||||
<component name="PyDocumentationSettings">
|
||||
<option name="format" value="GOOGLE" />
|
||||
<option name="myDocStringFormat" value="Google" />
|
||||
|
||||
18
Dockerfile
18
Dockerfile
@@ -11,7 +11,23 @@ ENV APP_HOME /app
|
||||
WORKDIR $APP_HOME
|
||||
COPY . ./
|
||||
|
||||
RUN apt update && apt install -y ffmpeg
|
||||
RUN apt update && apt install -y \
|
||||
ffmpeg \
|
||||
poppler-utils \
|
||||
texlive-latex-base \
|
||||
texlive-fonts-recommended \
|
||||
texlive-latex-extra \
|
||||
texlive-xetex \
|
||||
pandoc \
|
||||
librsvg2-bin \
|
||||
curl \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
|
||||
RUN curl -sL https://deb.nodesource.com/setup_20.x | bash - \
|
||||
&& apt-get install -y nodejs
|
||||
|
||||
RUN npm install -g firebase-tools
|
||||
|
||||
# Install production dependencies.
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
|
||||
59
app.py
59
app.py
@@ -5,6 +5,7 @@ import firebase_admin
|
||||
from firebase_admin import credentials
|
||||
from flask import Flask, request
|
||||
from flask_jwt_extended import JWTManager, jwt_required
|
||||
from pymongo import MongoClient
|
||||
from sentence_transformers import SentenceTransformer
|
||||
|
||||
from helper.api_messages import *
|
||||
@@ -18,7 +19,11 @@ from helper.openai_interface import *
|
||||
from helper.question_templates import *
|
||||
from helper.speech_to_text_helper import *
|
||||
from heygen.AvatarEnum import AvatarEnum
|
||||
from training_content import TrainingContentService, TrainingContentKnowledgeBase, GPT
|
||||
from modules import GPT
|
||||
from modules.training_content import TrainingContentService, TrainingContentKnowledgeBase
|
||||
from modules.upload_level import UploadLevelService
|
||||
from modules.batch_users import BatchUsers
|
||||
|
||||
|
||||
load_dotenv()
|
||||
|
||||
@@ -40,8 +45,14 @@ embeddings = SentenceTransformer('all-MiniLM-L6-v2')
|
||||
kb = TrainingContentKnowledgeBase(embeddings)
|
||||
kb.load_indices_and_metadata()
|
||||
open_ai = GPT(OpenAI())
|
||||
firestore_client = firestore.client()
|
||||
tc_service = TrainingContentService(kb, open_ai, firestore_client)
|
||||
|
||||
mongo_db = MongoClient(os.getenv('MONGODB_URI'))[os.getenv('MONGODB_DB')]
|
||||
|
||||
tc_service = TrainingContentService(kb, open_ai, mongo_db)
|
||||
|
||||
upload_level_service = UploadLevelService(open_ai)
|
||||
|
||||
batch_users_service = BatchUsers(mongo_db)
|
||||
|
||||
thread_event = threading.Event()
|
||||
|
||||
@@ -149,7 +160,7 @@ def save_listening():
|
||||
else:
|
||||
template["variant"] = ExamVariant.FULL.value
|
||||
|
||||
(result, id) = save_to_db_with_id("listening", template, id)
|
||||
(result, id) = save_to_db_with_id(mongo_db, "listening", template, id)
|
||||
if result:
|
||||
return {**template, "id": id}
|
||||
else:
|
||||
@@ -959,7 +970,7 @@ def save_speaking():
|
||||
name=("thread-save-speaking-" + id)
|
||||
)
|
||||
thread.start()
|
||||
app.logger.info('Started thread to save speaking. Thread: ' + thread.getName())
|
||||
app.logger.info('Started thread to save speaking. Thread: ' + thread.name)
|
||||
|
||||
# Return response without waiting for create_videos_and_save_to_db to finish
|
||||
return {**template, "id": id}
|
||||
@@ -1189,7 +1200,7 @@ def get_reading_passage_3_question():
|
||||
def get_level_exam():
|
||||
try:
|
||||
number_of_exercises = 25
|
||||
exercises = gen_multiple_choice_level(number_of_exercises)
|
||||
exercises = gen_multiple_choice_level(mongo_db, number_of_exercises)
|
||||
return {
|
||||
"exercises": [exercises],
|
||||
"isDiagnostic": False,
|
||||
@@ -1282,7 +1293,7 @@ def get_level_utas():
|
||||
bs_2["questions"] = blank_space_text_2
|
||||
|
||||
# Reading text
|
||||
reading_text = gen_reading_passage_utas(87, 10, 4)
|
||||
reading_text = gen_reading_passage_utas(mongo_db, 87, 10, 4)
|
||||
print(json.dumps(reading_text, indent=4))
|
||||
reading["questions"] = reading_text
|
||||
|
||||
@@ -1309,6 +1320,7 @@ class CustomLevelExerciseTypes(Enum):
|
||||
MULTIPLE_CHOICE_4 = "multiple_choice_4"
|
||||
MULTIPLE_CHOICE_BLANK_SPACE = "multiple_choice_blank_space"
|
||||
MULTIPLE_CHOICE_UNDERLINED = "multiple_choice_underlined"
|
||||
FILL_BLANKS_MC = "fill_blanks_mc"
|
||||
BLANK_SPACE_TEXT = "blank_space_text"
|
||||
READING_PASSAGE_UTAS = "reading_passage_utas"
|
||||
WRITING_LETTER = "writing_letter"
|
||||
@@ -1406,6 +1418,14 @@ def get_custom_level():
|
||||
exercise_id = exercise_id + qty
|
||||
exercise_qty = exercise_qty - qty
|
||||
|
||||
elif exercise_type == CustomLevelExerciseTypes.FILL_BLANKS_MC.value:
|
||||
response["exercises"]["exercise_" + str(i)] = gen_fill_blanks_mc_utas(
|
||||
exercise_qty, exercise_id, exercise_text_size
|
||||
)
|
||||
response["exercises"]["exercise_" + str(i)]["type"] = "fillBlanks"
|
||||
response["exercises"]["exercise_" + str(i)]["variant"] = "mc"
|
||||
exercise_id = exercise_id + exercise_qty
|
||||
|
||||
elif exercise_type == CustomLevelExerciseTypes.BLANK_SPACE_TEXT.value:
|
||||
response["exercises"]["exercise_" + str(i)] = gen_blank_space_text_utas(exercise_qty, exercise_id,
|
||||
exercise_text_size)
|
||||
@@ -1690,8 +1710,29 @@ def grading_summary():
|
||||
@jwt_required()
|
||||
def training_content():
|
||||
try:
|
||||
data = request.get_json()
|
||||
return tc_service.get_tips(data)
|
||||
return tc_service.get_tips(request.get_json())
|
||||
except Exception as e:
|
||||
app.logger.error(str(e))
|
||||
return str(e)
|
||||
|
||||
|
||||
# TODO: create a doc in firestore with a status and get its id, run this in a thread and modify the doc in firestore,
|
||||
# return the id right away, in generation view poll for the id
|
||||
@app.route('/upload_level', methods=['POST'])
|
||||
def upload_file():
|
||||
if 'file' not in request.files:
|
||||
return 'File wasn\'t uploaded', 400
|
||||
file = request.files['file']
|
||||
if file.filename == '':
|
||||
return 'No selected file', 400
|
||||
if file:
|
||||
return upload_level_service.generate_level_from_file(file), 200
|
||||
|
||||
|
||||
@app.route('/batch_users', methods=['POST'])
|
||||
def create_users_batch():
|
||||
try:
|
||||
return batch_users_service.batch_users(request.get_json())
|
||||
except Exception as e:
|
||||
app.logger.error(str(e))
|
||||
return str(e)
|
||||
|
||||
@@ -5,6 +5,7 @@ import string
|
||||
import uuid
|
||||
|
||||
import nltk
|
||||
from pymongo.database import Database
|
||||
from wonderwords import RandomWord
|
||||
|
||||
from helper.constants import *
|
||||
@@ -1210,7 +1211,7 @@ def gen_write_blanks_form_exercise_listening_monologue(text: str, quantity: int,
|
||||
}
|
||||
|
||||
|
||||
def gen_multiple_choice_level(quantity: int, start_id=1):
|
||||
def gen_multiple_choice_level(mongo_db: Database, quantity: int, start_id=1):
|
||||
gen_multiple_choice_for_text = "Generate " + str(
|
||||
quantity) + " multiple choice questions of 4 options for an english level exam, some easy questions, some intermediate " \
|
||||
"questions and some advanced questions. Ensure that the questions cover a range of topics such as " \
|
||||
@@ -1240,9 +1241,9 @@ def gen_multiple_choice_level(quantity: int, start_id=1):
|
||||
GEN_QUESTION_TEMPERATURE)
|
||||
|
||||
if len(question["questions"]) != quantity:
|
||||
return gen_multiple_choice_level(quantity, start_id)
|
||||
return gen_multiple_choice_level(mongo_db, quantity, start_id)
|
||||
else:
|
||||
all_exams = get_all("level")
|
||||
all_exams = get_all(mongo_db, "level")
|
||||
seen_keys = set()
|
||||
for i in range(len(question["questions"])):
|
||||
question["questions"][i], seen_keys = replace_exercise_if_exists(all_exams, question["questions"][i],
|
||||
@@ -1563,6 +1564,66 @@ def gen_multiple_choice_underlined_utas(quantity: int, start_id: int, all_exams=
|
||||
return response
|
||||
|
||||
|
||||
def gen_fill_blanks_mc_utas(quantity: int, start_id: int, size: int, topic=random.choice(mti_topics)):
|
||||
json_format = {
|
||||
"question": {
|
||||
"solutions": [
|
||||
{
|
||||
"id": "<question id>",
|
||||
"solution": "<the option that holds the solution>"
|
||||
}
|
||||
],
|
||||
"words": [
|
||||
{
|
||||
"id": "<question id>",
|
||||
"options": {
|
||||
"A": "<a option>",
|
||||
"B": "<b option>",
|
||||
"C": "<c option>",
|
||||
"D": "<d option>"
|
||||
}
|
||||
}
|
||||
],
|
||||
"text": "text"
|
||||
}
|
||||
}
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": 'You are a helpful assistant designed to output JSON on this format: ' + str(json_format)
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"content": (
|
||||
f'Generate a text of at least {size} words about the topic {topic}. Make sure the text is structured '
|
||||
'in paragraphs formatted with newlines (\\n\\n) to delimit them.'
|
||||
)
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"content": (
|
||||
f'From the generated text choose {quantity} words (cannot be sequential words) to replace '
|
||||
'once with {{id}} where id starts on ' + str(start_id) + ' and is incremented for each word. '
|
||||
'For each word choose 4 options, 1 correct and the other ones false. Make sure that only 1 is the '
|
||||
'correct one amongst the 4 options and put the solution on the solutions array. '
|
||||
'The ids must be ordered throughout the text and the words must be replaced only once. Put the '
|
||||
'removed words and respective ids on the words array of the json in the correct order. You can\'t '
|
||||
'reference multiple times the same id across the text, if for example one of the chosen words is '
|
||||
'"word1" then word1 must be placed in the text with an id once, if word1 is referenced other '
|
||||
'times in the text then replace with the actual text of word.'
|
||||
)
|
||||
}
|
||||
]
|
||||
|
||||
token_count = count_total_tokens(messages)
|
||||
question = make_openai_call(GPT_4_O, messages, token_count,
|
||||
["question"],
|
||||
GEN_QUESTION_TEMPERATURE)
|
||||
|
||||
return question["question"]
|
||||
|
||||
|
||||
def gen_blank_space_text_utas(quantity: int, start_id: int, size: int, topic=random.choice(mti_topics)):
|
||||
json_format = {
|
||||
"question": {
|
||||
@@ -1617,10 +1678,10 @@ def gen_blank_space_text_utas(quantity: int, start_id: int, size: int, topic=ran
|
||||
return question["question"]
|
||||
|
||||
|
||||
def gen_reading_passage_utas(start_id, sa_quantity: int, mc_quantity: int, topic=random.choice(mti_topics)):
|
||||
def gen_reading_passage_utas(mongo_db: Database, start_id, sa_quantity: int, mc_quantity: int, topic=random.choice(mti_topics)):
|
||||
passage = generate_reading_passage_1_text(topic)
|
||||
short_answer = gen_short_answer_utas(passage["text"], start_id, sa_quantity)
|
||||
mc_exercises = gen_text_multiple_choice_utas(passage["text"], start_id + sa_quantity, mc_quantity)
|
||||
mc_exercises = gen_text_multiple_choice_utas(mongo_db, passage["text"], start_id + sa_quantity, mc_quantity)
|
||||
return {
|
||||
"exercises": {
|
||||
"shortAnswer": short_answer,
|
||||
@@ -1659,7 +1720,7 @@ def gen_short_answer_utas(text: str, start_id: int, sa_quantity: int):
|
||||
GEN_QUESTION_TEMPERATURE)["questions"]
|
||||
|
||||
|
||||
def gen_text_multiple_choice_utas(text: str, start_id: int, mc_quantity: int):
|
||||
def gen_text_multiple_choice_utas(mongo_db: Database, text: str, start_id: int, mc_quantity: int):
|
||||
json_format = {
|
||||
"questions": [
|
||||
{
|
||||
@@ -1711,7 +1772,7 @@ def gen_text_multiple_choice_utas(text: str, start_id: int, mc_quantity: int):
|
||||
GEN_QUESTION_TEMPERATURE)
|
||||
|
||||
if len(question["questions"]) != mc_quantity:
|
||||
return gen_multiple_choice_level(mc_quantity, start_id)
|
||||
return gen_multiple_choice_level(mongo_db, mc_quantity, start_id)
|
||||
else:
|
||||
response = fix_exercise_ids(question, start_id)
|
||||
response["questions"] = randomize_mc_options_order(response["questions"])
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import logging
|
||||
|
||||
from firebase_admin import firestore
|
||||
from google.cloud import storage
|
||||
from pymongo.database import Database
|
||||
|
||||
|
||||
def download_firebase_file(bucket_name, source_blob_name, destination_file_name):
|
||||
@@ -50,38 +50,16 @@ def upload_file_firebase_get_url(bucket_name, destination_blob_name, source_file
|
||||
return None
|
||||
|
||||
|
||||
def save_to_db(collection: str, item):
|
||||
db = firestore.client()
|
||||
collection_ref = db.collection(collection)
|
||||
(update_time, document_ref) = collection_ref.add(item)
|
||||
def save_to_db_with_id(mongo_db: Database, collection: str, item, id: str):
|
||||
collection_ref = mongo_db[collection]
|
||||
|
||||
document_ref = collection_ref.insert_one({"id": id, **item})
|
||||
if document_ref:
|
||||
logging.info(f"Document added with ID: {document_ref.id}")
|
||||
return (True, document_ref.id)
|
||||
logging.info(f"Document added with ID: {document_ref.inserted_id}")
|
||||
return (True, document_ref.inserted_id)
|
||||
else:
|
||||
return (False, None)
|
||||
|
||||
|
||||
def save_to_db_with_id(collection: str, item, id: str):
|
||||
db = firestore.client()
|
||||
collection_ref = db.collection(collection)
|
||||
# Reference to the specific document with the desired ID
|
||||
document_ref = collection_ref.document(id)
|
||||
# Set the data to the document
|
||||
document_ref.set(item)
|
||||
if document_ref:
|
||||
logging.info(f"Document added with ID: {document_ref.id}")
|
||||
return (True, document_ref.id)
|
||||
else:
|
||||
return (False, None)
|
||||
|
||||
|
||||
def get_all(collection: str):
|
||||
db = firestore.client()
|
||||
collection_ref = db.collection(collection)
|
||||
|
||||
all_exercises = (
|
||||
collection_ref
|
||||
.get()
|
||||
)
|
||||
|
||||
return all_exercises
|
||||
def get_all(mongo_db: Database, collection: str):
|
||||
return list(mongo_db[collection].find())
|
||||
|
||||
5
modules/__init__.py
Normal file
5
modules/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
from .gpt import GPT
|
||||
|
||||
__all__ = [
|
||||
"GPT"
|
||||
]
|
||||
5
modules/batch_users/__init__.py
Normal file
5
modules/batch_users/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
from .service import BatchUsers
|
||||
|
||||
__all__ = [
|
||||
"BatchUsers"
|
||||
]
|
||||
31
modules/batch_users/batch_users.py
Normal file
31
modules/batch_users/batch_users.py
Normal file
@@ -0,0 +1,31 @@
|
||||
import uuid
|
||||
from typing import Optional
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
class DemographicInfo(BaseModel):
|
||||
phone: str
|
||||
passport_id: Optional[str] = None
|
||||
country: Optional[str] = None
|
||||
|
||||
|
||||
class UserDTO(BaseModel):
|
||||
id: uuid.UUID = Field(default_factory=uuid.uuid4)
|
||||
email: str
|
||||
name: str
|
||||
type: str
|
||||
passport_id: str
|
||||
passwordHash: str
|
||||
passwordSalt: str
|
||||
groupName: Optional[str] = None
|
||||
corporate: Optional[str] = None
|
||||
studentID: Optional[str] = None
|
||||
expiryDate: Optional[str] = None
|
||||
demographicInformation: Optional[DemographicInfo] = None
|
||||
|
||||
|
||||
class BatchUsersDTO(BaseModel):
|
||||
makerID: str
|
||||
users: list[UserDTO]
|
||||
269
modules/batch_users/service.py
Normal file
269
modules/batch_users/service.py
Normal file
@@ -0,0 +1,269 @@
|
||||
import os
|
||||
import subprocess
|
||||
import time
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
from logging import getLogger
|
||||
|
||||
import pandas as pd
|
||||
from typing import Dict
|
||||
|
||||
import shortuuid
|
||||
from pymongo.database import Database
|
||||
|
||||
from modules.batch_users.batch_users import BatchUsersDTO, UserDTO
|
||||
from modules.helper.file_helper import FileHelper
|
||||
|
||||
|
||||
class BatchUsers:
|
||||
|
||||
_DEFAULT_DESIRED_LEVELS = {
|
||||
"reading": 9,
|
||||
"listening": 9,
|
||||
"writing": 9,
|
||||
"speaking": 9,
|
||||
}
|
||||
|
||||
_DEFAULT_LEVELS = {
|
||||
"reading": 0,
|
||||
"listening": 0,
|
||||
"writing": 0,
|
||||
"speaking": 0,
|
||||
}
|
||||
|
||||
def __init__(self, mongo: Database):
|
||||
self._db: Database = mongo
|
||||
self._logger = getLogger(__name__)
|
||||
|
||||
def batch_users(self, request_data: Dict):
|
||||
batch_dto = self._map_to_batch(request_data)
|
||||
|
||||
file_name = f'{uuid.uuid4()}.csv'
|
||||
path = f'./tmp/{file_name}'
|
||||
self._generate_firebase_auth_csv(batch_dto, path)
|
||||
|
||||
result = self._upload_users('./tmp', file_name)
|
||||
if result.returncode != 0:
|
||||
error_msg = f"Couldn't upload users. Failed to run command firebase auth import -> ```cmd {result.stderr}```"
|
||||
self._logger.error(error_msg)
|
||||
return error_msg
|
||||
|
||||
self._init_users(batch_dto)
|
||||
|
||||
FileHelper.remove_file(path)
|
||||
return {"ok": True}
|
||||
|
||||
@staticmethod
|
||||
def _map_to_batch(request_data: Dict) -> BatchUsersDTO:
|
||||
users: list[UserDTO] = [UserDTO(**user) for user in request_data["users"]]
|
||||
return BatchUsersDTO(makerID=request_data["makerID"], users=users)
|
||||
|
||||
@staticmethod
|
||||
def _generate_firebase_auth_csv(batch_dto: BatchUsersDTO, path: str):
|
||||
# https://firebase.google.com/docs/cli/auth#file_format
|
||||
columns = [
|
||||
'UID', 'Email', 'Email Verified', 'Password Hash', 'Password Salt', 'Name',
|
||||
'Photo URL', 'Google ID', 'Google Email', 'Google Display Name', 'Google Photo URL',
|
||||
'Facebook ID', 'Facebook Email', 'Facebook Display Name', 'Facebook Photo URL',
|
||||
'Twitter ID', 'Twitter Email', 'Twitter Display Name', 'Twitter Photo URL',
|
||||
'GitHub ID', 'GitHub Email', 'GitHub Display Name', 'GitHub Photo URL',
|
||||
'User Creation Time', 'Last Sign-In Time', 'Phone Number'
|
||||
]
|
||||
users_data = []
|
||||
|
||||
current_time = int(time.time() * 1000)
|
||||
|
||||
for user in batch_dto.users:
|
||||
user_data = {
|
||||
'UID': str(user.id),
|
||||
'Email': user.email,
|
||||
'Email Verified': False,
|
||||
'Password Hash': user.passwordHash,
|
||||
'Password Salt': user.passwordSalt,
|
||||
'Name': '',
|
||||
'Photo URL': '',
|
||||
'Google ID': '',
|
||||
'Google Email': '',
|
||||
'Google Display Name': '',
|
||||
'Google Photo URL': '',
|
||||
'Facebook ID': '',
|
||||
'Facebook Email': '',
|
||||
'Facebook Display Name': '',
|
||||
'Facebook Photo URL': '',
|
||||
'Twitter ID': '',
|
||||
'Twitter Email': '',
|
||||
'Twitter Display Name': '',
|
||||
'Twitter Photo URL': '',
|
||||
'GitHub ID': '',
|
||||
'GitHub Email': '',
|
||||
'GitHub Display Name': '',
|
||||
'GitHub Photo URL': '',
|
||||
'User Creation Time': current_time,
|
||||
'Last Sign-In Time': '',
|
||||
'Phone Number': ''
|
||||
}
|
||||
users_data.append(user_data)
|
||||
|
||||
df = pd.DataFrame(users_data, columns=columns)
|
||||
df.to_csv(path, index=False, header=False)
|
||||
|
||||
@staticmethod
|
||||
def _upload_users(directory: str, file_name: str):
|
||||
command = (
|
||||
f'firebase auth:import {file_name} '
|
||||
f'--hash-algo=SCRYPT '
|
||||
f'--hash-key={os.getenv("FIREBASE_SCRYPT_B64_SIGNER_KEY")} '
|
||||
f'--salt-separator={os.getenv("FIREBASE_SCRYPT_B64_SALT_SEPARATOR")} '
|
||||
f'--rounds={os.getenv("FIREBASE_SCRYPT_ROUNDS")} '
|
||||
f'--mem-cost={os.getenv("FIREBASE_SCRYPT_MEM_COST")} '
|
||||
f'--project={os.getenv("FIREBASE_PROJECT_ID")} '
|
||||
)
|
||||
|
||||
result = subprocess.run(command, shell=True, cwd=directory, capture_output=True, text=True)
|
||||
return result
|
||||
|
||||
def _init_users(self, batch_users: BatchUsersDTO):
|
||||
maker_id = batch_users.makerID
|
||||
for user in batch_users.users:
|
||||
self._insert_new_user(user)
|
||||
code = self._create_code(user, maker_id)
|
||||
|
||||
if user.type == "corporate":
|
||||
self._set_corporate_default_groups(user)
|
||||
|
||||
if user.corporate:
|
||||
self._assign_corporate_to_user(user, code)
|
||||
|
||||
if user.groupName and len(user.groupName.strip()) > 0:
|
||||
self._assign_user_to_group_by_name(user, maker_id)
|
||||
|
||||
def _insert_new_user(self, user: UserDTO):
|
||||
new_user = {
|
||||
**user.dict(exclude={
|
||||
'passport_id', 'groupName', 'expiryDate',
|
||||
'corporate', 'passwordHash', 'passwordSalt'
|
||||
}),
|
||||
'bio': "",
|
||||
'focus': "academic",
|
||||
'status': "active",
|
||||
'desiredLevels': self._DEFAULT_DESIRED_LEVELS,
|
||||
'profilePicture': "/defaultAvatar.png",
|
||||
'levels': self._DEFAULT_LEVELS,
|
||||
'isFirstLogin': False,
|
||||
'isVerified': True,
|
||||
'registrationDate': datetime.now(),
|
||||
'subscriptionExpirationDate': user.expiryDate
|
||||
}
|
||||
self._db.users.insert_one(new_user)
|
||||
|
||||
def _create_code(self, user: UserDTO, maker_id: str) -> str:
|
||||
code = shortuuid.ShortUUID().random(length=6)
|
||||
self._db.codes.insert_one({
|
||||
'id': code,
|
||||
'code': code,
|
||||
'creator': maker_id,
|
||||
'expiryDate': user.expiryDate,
|
||||
'type': user.type,
|
||||
'creationDate': datetime.now(),
|
||||
'userId': str(user.id),
|
||||
'email': user.email,
|
||||
'name': user.name,
|
||||
'passport_id': user.passport_id
|
||||
})
|
||||
return code
|
||||
|
||||
def _set_corporate_default_groups(self, user: UserDTO):
|
||||
user_id = str(user.id)
|
||||
default_groups = [
|
||||
{
|
||||
'admin': user_id,
|
||||
'id': str(uuid.uuid4()),
|
||||
'name': "Teachers",
|
||||
'participants': [],
|
||||
'disableEditing': True,
|
||||
},
|
||||
{
|
||||
'admin': user_id,
|
||||
'id': str(uuid.uuid4()),
|
||||
'name': "Students",
|
||||
'participants': [],
|
||||
'disableEditing': True,
|
||||
},
|
||||
{
|
||||
'admin': user_id,
|
||||
'id': str(uuid.uuid4()),
|
||||
'name': "Corporate",
|
||||
'participants': [],
|
||||
'disableEditing': True,
|
||||
}
|
||||
]
|
||||
for group in default_groups:
|
||||
self._db.groups.insert_one(group)
|
||||
|
||||
def _assign_corporate_to_user(self, user: UserDTO, code: str):
|
||||
user_id = str(user.id)
|
||||
corporate_user = self._db.users.find_one(
|
||||
{"email": user.corporate}
|
||||
)
|
||||
if corporate_user:
|
||||
self._db.codes.update_one(
|
||||
{"id": code},
|
||||
{"$set": {"creator": corporate_user.id}},
|
||||
upsert=True
|
||||
)
|
||||
group_type = "Students" if user.type == "student" else "Teachers"
|
||||
|
||||
group = self._db.groups.find_one(
|
||||
{
|
||||
"admin": corporate_user.id,
|
||||
"name": group_type
|
||||
}
|
||||
)
|
||||
|
||||
if group:
|
||||
participants = group['participants']
|
||||
if user_id not in participants:
|
||||
participants.append(user_id)
|
||||
self._db.groups.update_one(
|
||||
{"id": group.id},
|
||||
{"$set": {"participants": participants}}
|
||||
)
|
||||
|
||||
else:
|
||||
group = {
|
||||
'admin': corporate_user.id,
|
||||
'id': str(uuid.uuid4()),
|
||||
'name': group_type,
|
||||
'participants': [user_id],
|
||||
'disableEditing': True,
|
||||
}
|
||||
|
||||
self._db.groups.insert_one(group)
|
||||
|
||||
def _assign_user_to_group_by_name(self, user: UserDTO, maker_id: str):
|
||||
user_id = str(user.id)
|
||||
|
||||
group = self._db.groups.find_one(
|
||||
{
|
||||
"admin": maker_id,
|
||||
"name": user.group_name.strip()
|
||||
}
|
||||
)
|
||||
|
||||
if group:
|
||||
new_group = {
|
||||
'id': str(uuid.uuid4()),
|
||||
'admin': maker_id,
|
||||
'name': user.groupName.strip(),
|
||||
'participants': [user_id],
|
||||
'disableEditing': False,
|
||||
}
|
||||
self._db.groups.insert_one(new_group)
|
||||
else:
|
||||
participants = group.participants
|
||||
if user_id not in participants:
|
||||
participants.append(user_id)
|
||||
self._db.groups.update_one(
|
||||
{"id": group.id},
|
||||
{"$set": {"participants": participants}}
|
||||
)
|
||||
@@ -1,17 +1,19 @@
|
||||
import json
|
||||
from logging import getLogger
|
||||
|
||||
from typing import List, Optional, Callable
|
||||
from typing import List, Optional, Callable, TypeVar
|
||||
|
||||
from openai.types.chat import ChatCompletionMessageParam
|
||||
from pydantic import BaseModel
|
||||
|
||||
T = TypeVar('T', bound=BaseModel)
|
||||
|
||||
|
||||
class GPT:
|
||||
|
||||
def __init__(self, openai_client):
|
||||
self._client = openai_client
|
||||
self._default_model = "gpt-4o"
|
||||
self._default_model = "gpt-4o-2024-08-06"
|
||||
self._logger = getLogger(__name__)
|
||||
|
||||
def prediction(
|
||||
@@ -23,7 +25,7 @@ class GPT:
|
||||
model: Optional[str] = None,
|
||||
temperature: Optional[float] = None,
|
||||
max_retries: int = 3
|
||||
) -> List[BaseModel] | BaseModel | str | None:
|
||||
) -> List[T] | T | None:
|
||||
params = {
|
||||
"messages": messages,
|
||||
"response_format": {"type": "json_object"},
|
||||
5
modules/helper/__init__.py
Normal file
5
modules/helper/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
from .logger import LoggerHelper
|
||||
|
||||
__all__ = [
|
||||
"LoggerHelper"
|
||||
]
|
||||
97
modules/helper/file_helper.py
Normal file
97
modules/helper/file_helper.py
Normal file
@@ -0,0 +1,97 @@
|
||||
import base64
|
||||
import io
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import uuid
|
||||
from typing import Optional, Tuple
|
||||
|
||||
import numpy as np
|
||||
import pypandoc
|
||||
from PIL import Image
|
||||
|
||||
|
||||
class FileHelper:
|
||||
|
||||
# Supposedly pandoc covers a wide range of file extensions only tested with docx
|
||||
@staticmethod
|
||||
def convert_file_to_pdf(input_path: str, output_path: str):
|
||||
pypandoc.convert_file(input_path, 'pdf', outputfile=output_path, extra_args=[
|
||||
'-V', 'geometry:paperwidth=5.5in',
|
||||
'-V', 'geometry:paperheight=8.5in',
|
||||
'-V', 'geometry:margin=0.5in',
|
||||
'-V', 'pagestyle=empty'
|
||||
])
|
||||
|
||||
@staticmethod
|
||||
def convert_file_to_html(input_path: str, output_path: str):
|
||||
pypandoc.convert_file(input_path, 'html', outputfile=output_path)
|
||||
|
||||
@staticmethod
|
||||
def pdf_to_png(path_id: str):
|
||||
to_png = f"pdftoppm -png exercises.pdf page"
|
||||
result = subprocess.run(to_png, shell=True, cwd=f'./tmp/{path_id}', capture_output=True, text=True)
|
||||
if result.returncode != 0:
|
||||
raise Exception(
|
||||
f"Couldn't convert pdf to png. Failed to run command '{to_png}' -> ```cmd {result.stderr}```")
|
||||
|
||||
@staticmethod
|
||||
def is_page_blank(image_bytes: bytes, image_threshold=10) -> bool:
|
||||
with Image.open(io.BytesIO(image_bytes)) as img:
|
||||
img_gray = img.convert('L')
|
||||
img_array = np.array(img_gray)
|
||||
non_white_pixels = np.sum(img_array < 255)
|
||||
|
||||
return non_white_pixels <= image_threshold
|
||||
|
||||
@classmethod
|
||||
def _encode_image(cls, image_path: str, image_threshold=10) -> Optional[str]:
|
||||
with open(image_path, "rb") as image_file:
|
||||
image_bytes = image_file.read()
|
||||
|
||||
if cls.is_page_blank(image_bytes, image_threshold):
|
||||
return None
|
||||
|
||||
return base64.b64encode(image_bytes).decode('utf-8')
|
||||
|
||||
@classmethod
|
||||
def b64_pngs(cls, path_id: str, files: list[str]):
|
||||
png_messages = []
|
||||
for filename in files:
|
||||
b64_string = cls._encode_image(os.path.join(f'./tmp/{path_id}', filename))
|
||||
if b64_string:
|
||||
png_messages.append({
|
||||
"type": "image_url",
|
||||
"image_url": {
|
||||
"url": f"data:image/png;base64,{b64_string}"
|
||||
}
|
||||
})
|
||||
return png_messages
|
||||
|
||||
@staticmethod
|
||||
def remove_directory(path):
|
||||
try:
|
||||
if os.path.exists(path):
|
||||
if os.path.isdir(path):
|
||||
shutil.rmtree(path)
|
||||
except Exception as e:
|
||||
print(f"An error occurred while trying to remove {path}: {str(e)}")
|
||||
|
||||
@staticmethod
|
||||
def remove_file(file_path):
|
||||
try:
|
||||
if os.path.exists(file_path):
|
||||
if os.path.isfile(file_path):
|
||||
os.remove(file_path)
|
||||
except Exception as e:
|
||||
print(f"An error occurred while trying to remove the file {file_path}: {str(e)}")
|
||||
|
||||
@staticmethod
|
||||
def save_upload(file) -> Tuple[str, str]:
|
||||
ext = file.filename.split('.')[-1]
|
||||
path_id = str(uuid.uuid4())
|
||||
os.makedirs(f'./tmp/{path_id}', exist_ok=True)
|
||||
|
||||
tmp_filename = f'./tmp/{path_id}/uploaded.{ext}'
|
||||
file.save(tmp_filename)
|
||||
return ext, path_id
|
||||
23
modules/helper/logger.py
Normal file
23
modules/helper/logger.py
Normal file
@@ -0,0 +1,23 @@
|
||||
import logging
|
||||
from functools import wraps
|
||||
|
||||
|
||||
class LoggerHelper:
|
||||
|
||||
@staticmethod
|
||||
def suppress_loggers():
|
||||
def decorator(f):
|
||||
@wraps(f)
|
||||
def wrapped(*args, **kwargs):
|
||||
root_logger = logging.getLogger()
|
||||
original_level = root_logger.level
|
||||
|
||||
root_logger.setLevel(logging.ERROR)
|
||||
|
||||
try:
|
||||
return f(*args, **kwargs)
|
||||
finally:
|
||||
root_logger.setLevel(original_level)
|
||||
|
||||
return wrapped
|
||||
return decorator
|
||||
@@ -1,9 +1,7 @@
|
||||
from .kb import TrainingContentKnowledgeBase
|
||||
from .service import TrainingContentService
|
||||
from .gpt import GPT
|
||||
|
||||
__all__ = [
|
||||
"TrainingContentService",
|
||||
"TrainingContentKnowledgeBase",
|
||||
"GPT"
|
||||
"TrainingContentKnowledgeBase"
|
||||
]
|
||||
@@ -1,10 +1,13 @@
|
||||
import json
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
from logging import getLogger
|
||||
|
||||
from typing import Dict, List
|
||||
|
||||
from training_content.dtos import TrainingContentDTO, WeakAreaDTO, QueryDTO, DetailsDTO, TipsDTO
|
||||
from pymongo.database import Database
|
||||
|
||||
from modules.training_content.dtos import TrainingContentDTO, WeakAreaDTO, QueryDTO, DetailsDTO, TipsDTO
|
||||
|
||||
|
||||
class TrainingContentService:
|
||||
@@ -19,13 +22,14 @@ class TrainingContentService:
|
||||
]
|
||||
# strategy word_link ct_focus reading_skill word_partners writing_skill language_for_writing
|
||||
|
||||
def __init__(self, kb, openai, firestore):
|
||||
def __init__(self, kb, openai, mongo: Database):
|
||||
self._training_content_module = kb
|
||||
self._db = firestore
|
||||
self._db: Database = mongo
|
||||
self._logger = getLogger(__name__)
|
||||
self._llm = openai
|
||||
|
||||
def get_tips(self, stats):
|
||||
def get_tips(self, training_content):
|
||||
user, stats = training_content["userID"], training_content["stats"]
|
||||
exam_data, exam_map = self._sort_out_solutions(stats)
|
||||
training_content = self._get_exam_details_and_tips(exam_data)
|
||||
tips = self._query_kb(training_content.queries)
|
||||
@@ -36,16 +40,18 @@ class TrainingContentService:
|
||||
for area in training_content.weak_areas:
|
||||
weak_areas["weak_areas"].append(area.dict())
|
||||
|
||||
new_id = uuid.uuid4()
|
||||
training_doc = {
|
||||
'id': new_id,
|
||||
'created_at': int(datetime.now().timestamp() * 1000),
|
||||
**exam_map,
|
||||
**usefull_tips.dict(),
|
||||
**weak_areas
|
||||
**weak_areas,
|
||||
"user": user
|
||||
}
|
||||
doc_ref = self._db.collection('training').add(training_doc)
|
||||
|
||||
self._db.training.insert_one(training_doc)
|
||||
return {
|
||||
"id": doc_ref[1].id
|
||||
"id": new_id
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
@@ -106,7 +112,15 @@ class TrainingContentService:
|
||||
'for tips that will be displayed to the student, the category attribute is a collection of '
|
||||
'embeddings and the text will be the text used to query the knowledge base. The categories are '
|
||||
f'the following [{", ".join(self.TOOLS)}]. The exam data will be a json where the key of the field '
|
||||
'"exams" is the exam id, an exam can be composed of multiple modules or single modules.'
|
||||
'"exams" is the exam id, an exam can be composed of multiple modules or single modules. The student'
|
||||
' will see your response so refrain from using phrasing like "The student" did x, y and z. If the '
|
||||
'field "answer" in a question is an empty array "[]", then the student didn\'t answer any question '
|
||||
'and you must address that in your response. Also questions aren\'t modules, the only modules are: '
|
||||
'level, speaking, writing, reading and listening. The details array needs to be tailored to the '
|
||||
'exam attempt, even if you receive the same exam you must treat as different exams by their id.'
|
||||
'Don\'t make references to an exam by it\'s id, the GUI will handle that so the student knows '
|
||||
'which is the exam your comments and summary are referencing too. Even if the student hasn\'t '
|
||||
'submitted no answers for an exam, you must still fill the details structure addressing that fact.'
|
||||
)
|
||||
},
|
||||
{
|
||||
@@ -203,13 +217,15 @@ class TrainingContentService:
|
||||
exercises[session_key][module][exam_id]["exercises"].extend(
|
||||
self._get_speaking_solutions(stat, exam)
|
||||
)
|
||||
elif module == "level": # same structure as listening
|
||||
elif module == "level":
|
||||
exercises[session_key][module][exam_id]["exercises"].extend(
|
||||
self._get_listening_solutions(stat, exam)
|
||||
self._get_level_solutions(stat, exam)
|
||||
)
|
||||
|
||||
exam_map[session_key]["score"] = round((exam_total_correct / exam_total_questions) * 100)
|
||||
exam_map[session_key]["module"] = module
|
||||
with open('exam_result.json', 'w') as file:
|
||||
json.dump({"exams": exercises}, file, indent=4)
|
||||
|
||||
return {"exams": exercises}, exam_map
|
||||
|
||||
@@ -237,6 +253,54 @@ class TrainingContentService:
|
||||
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def _get_mc_question(exercise, stat):
|
||||
shuffle_maps = stat.get("shuffleMaps", [])
|
||||
answer = stat["solutions"] if len(shuffle_maps) == 0 else []
|
||||
if len(shuffle_maps) != 0:
|
||||
for solution in stat["solutions"]:
|
||||
shuffle_map = [
|
||||
item["map"] for item in shuffle_maps
|
||||
if item["questionID"] == solution["question"]
|
||||
]
|
||||
answer.append({
|
||||
"question": solution["question"],
|
||||
"option": shuffle_map[solution["option"]]
|
||||
})
|
||||
return {
|
||||
"question": exercise["prompt"],
|
||||
"exercise": exercise["questions"],
|
||||
"answer": stat["solutions"]
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _swap_key_name(d, original_key, new_key):
|
||||
d[new_key] = d.pop(original_key)
|
||||
return d
|
||||
|
||||
def _get_level_solutions(self, stat, exam):
|
||||
result = []
|
||||
try:
|
||||
for part in exam["parts"]:
|
||||
for exercise in part["exercises"]:
|
||||
if exercise["id"] == stat["exercise"]:
|
||||
if stat["type"] == "fillBlanks":
|
||||
result.append({
|
||||
"prompt": exercise["prompt"],
|
||||
"template": exercise["text"],
|
||||
"words": exercise["words"],
|
||||
"solutions": exercise["solutions"],
|
||||
"answer": [
|
||||
self._swap_key_name(item, 'solution', 'option')
|
||||
for item in stat["solutions"]
|
||||
]
|
||||
})
|
||||
elif stat["type"] == "multipleChoice":
|
||||
result.append(self._get_mc_question(exercise, stat))
|
||||
except KeyError as e:
|
||||
self._logger.warning(f"Malformed stat object: {str(e)}")
|
||||
return result
|
||||
|
||||
def _get_listening_solutions(self, stat, exam):
|
||||
result = []
|
||||
try:
|
||||
@@ -250,16 +314,25 @@ class TrainingContentService:
|
||||
"solution": exercise["solutions"],
|
||||
"answer": stat["solutions"]
|
||||
})
|
||||
elif stat["type"] == "multipleChoice":
|
||||
elif stat["type"] == "fillBlanks":
|
||||
result.append({
|
||||
"question": exercise["prompt"],
|
||||
"exercise": exercise["questions"],
|
||||
"template": exercise["text"],
|
||||
"words": exercise["words"],
|
||||
"solutions": exercise["solutions"],
|
||||
"answer": stat["solutions"]
|
||||
})
|
||||
elif stat["type"] == "multipleChoice":
|
||||
result.append(self._get_mc_question(exercise, stat))
|
||||
|
||||
except KeyError as e:
|
||||
self._logger.warning(f"Malformed stat object: {str(e)}")
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def _find_shuffle_map(shuffle_maps, question_id):
|
||||
return next((item["map"] for item in shuffle_maps if item["questionID"] == question_id), None)
|
||||
|
||||
def _get_speaking_solutions(self, stat, exam):
|
||||
result = {}
|
||||
try:
|
||||
@@ -332,10 +405,5 @@ class TrainingContentService:
|
||||
return result
|
||||
|
||||
def _get_doc_by_id(self, collection: str, doc_id: str):
|
||||
collection_ref = self._db.collection(collection)
|
||||
doc_ref = collection_ref.document(doc_id)
|
||||
doc = doc_ref.get()
|
||||
|
||||
if doc.exists:
|
||||
return doc.to_dict()
|
||||
return None
|
||||
doc = self._db[collection].find_one({"id": doc_id})
|
||||
return doc
|
||||
5
modules/upload_level/__init__.py
Normal file
5
modules/upload_level/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
from .service import UploadLevelService
|
||||
|
||||
__all__ = [
|
||||
"UploadLevelService"
|
||||
]
|
||||
57
modules/upload_level/exam_dtos.py
Normal file
57
modules/upload_level/exam_dtos.py
Normal file
@@ -0,0 +1,57 @@
|
||||
from pydantic import BaseModel, Field
|
||||
from typing import List, Dict, Union, Optional, Any
|
||||
from uuid import uuid4, UUID
|
||||
|
||||
|
||||
class Option(BaseModel):
|
||||
id: str
|
||||
text: str
|
||||
|
||||
|
||||
class MultipleChoiceQuestion(BaseModel):
|
||||
id: str
|
||||
prompt: str
|
||||
variant: str = "text"
|
||||
solution: str
|
||||
options: List[Option]
|
||||
|
||||
|
||||
class MultipleChoiceExercise(BaseModel):
|
||||
id: UUID = Field(default_factory=uuid4)
|
||||
type: str = "multipleChoice"
|
||||
prompt: str = "Select the appropriate option."
|
||||
questions: List[MultipleChoiceQuestion]
|
||||
userSolutions: List = Field(default_factory=list)
|
||||
|
||||
|
||||
class FillBlanksWord(BaseModel):
|
||||
id: str
|
||||
options: Dict[str, str]
|
||||
|
||||
|
||||
class FillBlanksSolution(BaseModel):
|
||||
id: str
|
||||
solution: str
|
||||
|
||||
|
||||
class FillBlanksExercise(BaseModel):
|
||||
id: UUID = Field(default_factory=uuid4)
|
||||
type: str = "fillBlanks"
|
||||
variant: str = "mc"
|
||||
prompt: str = "Click a blank to select the appropriate word for it."
|
||||
text: str
|
||||
solutions: List[FillBlanksSolution]
|
||||
words: List[FillBlanksWord]
|
||||
userSolutions: List = Field(default_factory=list)
|
||||
|
||||
|
||||
Exercise = Union[MultipleChoiceExercise, FillBlanksExercise]
|
||||
|
||||
|
||||
class Part(BaseModel):
|
||||
exercises: List[Exercise]
|
||||
context: Optional[str] = Field(default=None)
|
||||
|
||||
|
||||
class Exam(BaseModel):
|
||||
parts: List[Part]
|
||||
66
modules/upload_level/mapper.py
Normal file
66
modules/upload_level/mapper.py
Normal file
@@ -0,0 +1,66 @@
|
||||
from typing import Dict, Any
|
||||
|
||||
from pydantic import ValidationError
|
||||
|
||||
from modules.upload_level.exam_dtos import (
|
||||
MultipleChoiceExercise,
|
||||
FillBlanksExercise,
|
||||
Part, Exam
|
||||
)
|
||||
from modules.upload_level.sheet_dtos import Sheet, Option, MultipleChoiceQuestion, FillBlanksWord
|
||||
|
||||
|
||||
class ExamMapper:
|
||||
|
||||
@staticmethod
|
||||
def map_to_exam_model(response: Dict[str, Any]) -> Exam:
|
||||
parts = []
|
||||
for part in response['parts']:
|
||||
part_exercises = part['exercises']
|
||||
context = part.get('context', None)
|
||||
|
||||
exercises = []
|
||||
for exercise in part_exercises:
|
||||
exercise_type = exercise['type']
|
||||
if exercise_type == 'multipleChoice':
|
||||
exercise_model = MultipleChoiceExercise(**exercise)
|
||||
elif exercise_type == 'fillBlanks':
|
||||
exercise_model = FillBlanksExercise(**exercise)
|
||||
else:
|
||||
raise ValidationError(f"Unknown exercise type: {exercise_type}")
|
||||
|
||||
exercises.append(exercise_model)
|
||||
|
||||
part_kwargs = {"exercises": exercises}
|
||||
if context is not None:
|
||||
part_kwargs["context"] = context
|
||||
|
||||
part_model = Part(**part_kwargs)
|
||||
parts.append(part_model)
|
||||
|
||||
return Exam(parts=parts)
|
||||
|
||||
@staticmethod
|
||||
def map_to_sheet(response: Dict[str, Any]) -> Sheet:
|
||||
components = []
|
||||
|
||||
for item in response["components"]:
|
||||
component_type = item["type"]
|
||||
|
||||
if component_type == "multipleChoice":
|
||||
options = [Option(id=opt["id"], text=opt["text"]) for opt in item["options"]]
|
||||
components.append(MultipleChoiceQuestion(
|
||||
id=item["id"],
|
||||
prompt=item["prompt"],
|
||||
variant=item.get("variant", "text"),
|
||||
options=options
|
||||
))
|
||||
elif component_type == "fillBlanks":
|
||||
components.append(FillBlanksWord(
|
||||
id=item["id"],
|
||||
options=item["options"]
|
||||
))
|
||||
else:
|
||||
components.append(item)
|
||||
|
||||
return Sheet(components=components)
|
||||
385
modules/upload_level/service.py
Normal file
385
modules/upload_level/service.py
Normal file
@@ -0,0 +1,385 @@
|
||||
import json
|
||||
import os
|
||||
import uuid
|
||||
from logging import getLogger
|
||||
|
||||
from typing import Dict, Any, Tuple, Callable
|
||||
|
||||
import pdfplumber
|
||||
|
||||
from modules import GPT
|
||||
from modules.helper.file_helper import FileHelper
|
||||
from modules.helper import LoggerHelper
|
||||
from modules.upload_level.exam_dtos import Exam
|
||||
from modules.upload_level.mapper import ExamMapper
|
||||
from modules.upload_level.sheet_dtos import Sheet
|
||||
|
||||
|
||||
class UploadLevelService:
|
||||
def __init__(self, openai: GPT):
|
||||
self._logger = getLogger(__name__)
|
||||
self._llm = openai
|
||||
|
||||
def generate_level_from_file(self, file) -> Dict[str, Any] | None:
|
||||
ext, path_id = FileHelper.save_upload(file)
|
||||
FileHelper.convert_file_to_pdf(
|
||||
f'./tmp/{path_id}/uploaded.{ext}', f'./tmp/{path_id}/exercises.pdf'
|
||||
)
|
||||
file_has_images = self._check_pdf_for_images(f'./tmp/{path_id}/exercises.pdf')
|
||||
|
||||
if not file_has_images:
|
||||
FileHelper.convert_file_to_html(f'./tmp/{path_id}/uploaded.{ext}', f'./tmp/{path_id}/exercises.html')
|
||||
|
||||
completion: Callable[[str], Exam] = self._png_completion if file_has_images else self._html_completion
|
||||
response = completion(path_id)
|
||||
|
||||
FileHelper.remove_directory(f'./tmp/{path_id}')
|
||||
|
||||
if response:
|
||||
return self.fix_ids(response.dict(exclude_none=True))
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
@LoggerHelper.suppress_loggers()
|
||||
def _check_pdf_for_images(pdf_path: str) -> bool:
|
||||
with pdfplumber.open(pdf_path) as pdf:
|
||||
for page in pdf.pages:
|
||||
if page.images:
|
||||
return True
|
||||
return False
|
||||
|
||||
def _level_json_schema(self):
|
||||
return {
|
||||
"parts": [
|
||||
{
|
||||
"context": "<this attribute is optional you may exclude it if not required>",
|
||||
"exercises": [
|
||||
self._multiple_choice_html(),
|
||||
self._passage_blank_space_html()
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
def _html_completion(self, path_id: str) -> Exam:
|
||||
with open(f'./tmp/{path_id}/exercises.html', 'r', encoding='utf-8') as f:
|
||||
html = f.read()
|
||||
|
||||
return self._llm.prediction(
|
||||
[self._gpt_instructions_html(),
|
||||
{
|
||||
"role": "user",
|
||||
"content": html
|
||||
}
|
||||
],
|
||||
ExamMapper.map_to_exam_model,
|
||||
str(self._level_json_schema())
|
||||
)
|
||||
|
||||
def _gpt_instructions_html(self):
|
||||
return {
|
||||
"role": "system",
|
||||
"content": (
|
||||
'You are GPT Scraper and your job is to clean dirty html into clean usable JSON formatted data.'
|
||||
'Your current task is to scrape html english questions sheets.\n\n'
|
||||
|
||||
'In the question sheet you will only see 4 types of question:\n'
|
||||
'- blank space multiple choice\n'
|
||||
'- underline multiple choice\n'
|
||||
'- reading passage blank space multiple choice\n'
|
||||
'- reading passage multiple choice\n\n'
|
||||
|
||||
'For the first two types of questions the template is the same but the question prompts differ, '
|
||||
'whilst in the blank space multiple choice you must include in the prompt the blank spaces with '
|
||||
'multiple "_", in the underline you must include in the prompt the <u></u> to '
|
||||
'indicate the underline and the options a, b, c, d must be the ordered underlines in the prompt.\n\n'
|
||||
|
||||
'For the reading passage exercise you must handle the formatting of the passages. If it is a '
|
||||
'reading passage with blank spaces you will see blanks represented with (question id) followed by a '
|
||||
'line and your job is to replace the brackets with the question id and line with "{{question id}}" '
|
||||
'with 2 newlines between paragraphs. For the reading passages without blanks you must remove '
|
||||
'any numbers that may be there to specify paragraph numbers or line numbers, and place 2 newlines '
|
||||
'between paragraphs.\n\n'
|
||||
|
||||
'IMPORTANT: Note that for the reading passages, the html might not reflect the actual paragraph '
|
||||
'structure, don\'t format the reading passages paragraphs only by the <p></p> tags, try to figure '
|
||||
'out the best paragraph separation possible.'
|
||||
|
||||
'You will place all the information in a single JSON: {"parts": [{"exercises": [{...}], "context": ""}]}\n '
|
||||
'Where {...} are the exercises templates for each part of a question sheet and the optional field '
|
||||
'context.'
|
||||
|
||||
'IMPORTANT: The question sheet may be divided by sections but you need to only consider the parts, '
|
||||
'so that you can group the exercises by the parts that are in the html, this is crucial since only '
|
||||
'reading passage multiple choice require context and if the context is included in parts where it '
|
||||
'is not required the UI will be messed up. Some make sure to correctly group the exercises by parts.\n'
|
||||
|
||||
'The templates for the exercises are the following:\n'
|
||||
'- blank space multiple choice, underline multiple choice and reading passage multiple choice: '
|
||||
f'{self._multiple_choice_html()}\n'
|
||||
f'- reading passage blank space multiple choice: {self._passage_blank_space_html()}\n'
|
||||
|
||||
'IMPORTANT: For the reading passage multiple choice the context field must be set with the reading '
|
||||
'passages without paragraphs or line numbers, with 2 newlines between paragraphs, for the other '
|
||||
'exercises exclude the context field.'
|
||||
)
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _multiple_choice_html():
|
||||
return {
|
||||
"type": "multipleChoice",
|
||||
"prompt": "Select the appropriate option.",
|
||||
"questions": [
|
||||
{
|
||||
"id": "<the question id>",
|
||||
"prompt": "<the question>",
|
||||
"solution": "<the option id solution>",
|
||||
"options": [
|
||||
{
|
||||
"id": "A",
|
||||
"text": "<the a option>"
|
||||
},
|
||||
{
|
||||
"id": "B",
|
||||
"text": "<the b option>"
|
||||
},
|
||||
{
|
||||
"id": "C",
|
||||
"text": "<the c option>"
|
||||
},
|
||||
{
|
||||
"id": "D",
|
||||
"text": "<the d option>"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _passage_blank_space_html():
|
||||
return {
|
||||
"type": "fillBlanks",
|
||||
"variant": "mc",
|
||||
"prompt": "Click a blank to select the appropriate word for it.",
|
||||
"text": (
|
||||
"<The whole text for the exercise with replacements for blank spaces and their "
|
||||
"ids with {{<question id>}} with 2 newlines between paragraphs>"
|
||||
),
|
||||
"solutions": [
|
||||
{
|
||||
"id": "<question id>",
|
||||
"solution": "<the option that holds the solution>"
|
||||
}
|
||||
],
|
||||
"words": [
|
||||
{
|
||||
"id": "<question id>",
|
||||
"options": {
|
||||
"A": "<a option>",
|
||||
"B": "<b option>",
|
||||
"C": "<c option>",
|
||||
"D": "<d option>"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
def _png_completion(self, path_id: str) -> Exam:
|
||||
FileHelper.pdf_to_png(path_id)
|
||||
|
||||
tmp_files = os.listdir(f'./tmp/{path_id}')
|
||||
pages = [f for f in tmp_files if f.startswith('page-') and f.endswith('.png')]
|
||||
pages.sort(key=lambda f: int(f.split('-')[1].split('.')[0]))
|
||||
|
||||
json_schema = {
|
||||
"components": [
|
||||
{"type": "part", "part": "<name or number of the part>"},
|
||||
self._multiple_choice_png(),
|
||||
{"type": "blanksPassage", "text": (
|
||||
"<The whole text for the exercise with replacements for blank spaces and their "
|
||||
"ids with {{<question id>}} with 2 newlines between paragraphs>"
|
||||
)},
|
||||
{"type": "passage", "context": (
|
||||
"<reading passages without paragraphs or line numbers, with 2 newlines between paragraphs>"
|
||||
)},
|
||||
self._passage_blank_space_png()
|
||||
]
|
||||
}
|
||||
|
||||
components = []
|
||||
|
||||
for i in range(len(pages)):
|
||||
current_page = pages[i]
|
||||
next_page = pages[i + 1] if i + 1 < len(pages) else None
|
||||
batch = [current_page, next_page] if next_page else [current_page]
|
||||
|
||||
sheet = self._png_batch(path_id, batch, json_schema)
|
||||
sheet.batch = i + 1
|
||||
components.append(sheet.dict())
|
||||
|
||||
batches = {"batches": components}
|
||||
with open('output.json', 'w') as json_file:
|
||||
json.dump(batches, json_file, indent=4)
|
||||
|
||||
return self._batches_to_exam_completion(batches)
|
||||
|
||||
def _png_batch(self, path_id: str, files: list[str], json_schema) -> Sheet:
|
||||
return self._llm.prediction(
|
||||
[self._gpt_instructions_png(),
|
||||
{
|
||||
"role": "user",
|
||||
"content": [
|
||||
*FileHelper.b64_pngs(path_id, files)
|
||||
]
|
||||
}
|
||||
],
|
||||
ExamMapper.map_to_sheet,
|
||||
str(json_schema)
|
||||
)
|
||||
|
||||
def _gpt_instructions_png(self):
|
||||
return {
|
||||
"role": "system",
|
||||
"content": (
|
||||
'You are GPT OCR and your job is to scan image text data and format it to JSON format.'
|
||||
'Your current task is to scan english questions sheets.\n\n'
|
||||
|
||||
'You will place all the information in a single JSON: {"components": [{...}]} where {...} is a set of '
|
||||
'sheet components you will retrieve from the images, the components and their corresponding JSON '
|
||||
'templates are as follows:\n'
|
||||
|
||||
'- Part, a standalone part or part of a section of the question sheet: '
|
||||
'{"type": "part", "part": "<name or number of the part>"}\n'
|
||||
|
||||
'- Multiple Choice Question, there are three types of multiple choice questions that differ on '
|
||||
'the prompt field of the template: blanks, underlines and normal. '
|
||||
|
||||
'In the blanks prompt you must leave 5 underscores to represent the blank space. '
|
||||
'In the underlines questions the objective is to pick the words that are incorrect in the given '
|
||||
'sentence, for these questions you must wrap the answer to the question with the html tag <u></u>, '
|
||||
'choose 3 other words to wrap in <u></u>, place them in the prompt field and use the underlined words '
|
||||
'in the order they appear in the question for the options A to D, disreguard options that might be '
|
||||
'included underneath the underlines question and use the ones you wrapped in <u></u>.'
|
||||
'In normal you just leave the question as is. '
|
||||
|
||||
f'The template for multiple choice questions is the following: {self._multiple_choice_png()}.\n'
|
||||
|
||||
'- Reading Passages, there are two types of reading passages. Reading passages where you will see '
|
||||
'blanks represented by a (question id) followed by a line, you must format these types of reading '
|
||||
'passages to be only the text with the brackets that have the question id and line replaced with '
|
||||
'"{{question id}}", also place 2 newlines between paragraphs. For the reading passages without blanks '
|
||||
'you must remove any numbers that may be there to specify paragraph numbers or line numbers, '
|
||||
'and place 2 newlines between paragraphs. '
|
||||
|
||||
'For the reading passages with blanks the template is: {"type": "blanksPassage", '
|
||||
'"text": "<The whole text for the exercise with replacements for blank spaces and their '
|
||||
'ids that are enclosed in brackets with {{<question id>}} also place 2 newlines between paragraphs>"}. '
|
||||
|
||||
'For the reading passage without blanks is: {"type": "passage", "context": "<reading passages without '
|
||||
'paragraphs or line numbers, with 2 newlines between paragraphs>"}\n'
|
||||
|
||||
'- Blanks Options, options for a blanks reading passage exercise, this type of component is a group of '
|
||||
'options with the question id and the options from a to d. The template is: '
|
||||
f'{self._passage_blank_space_png()}\n'
|
||||
|
||||
'IMPORTANT: You must place the components in the order that they were given to you. If an exercise or '
|
||||
'reading passages are cut off don\'t include them in the JSON.'
|
||||
)
|
||||
}
|
||||
|
||||
def _multiple_choice_png(self):
|
||||
multiple_choice = self._multiple_choice_html()["questions"][0]
|
||||
multiple_choice["type"] = "multipleChoice"
|
||||
multiple_choice.pop("solution")
|
||||
return multiple_choice
|
||||
|
||||
def _passage_blank_space_png(self):
|
||||
passage_blank_space = self._passage_blank_space_html()["words"][0]
|
||||
passage_blank_space["type"] = "fillBlanks"
|
||||
return passage_blank_space
|
||||
|
||||
def _batches_to_exam_completion(self, batches: Dict[str, Any]) -> Exam:
|
||||
return self._llm.prediction(
|
||||
[self._gpt_instructions_html(),
|
||||
{
|
||||
"role": "user",
|
||||
"content": str(batches)
|
||||
}
|
||||
],
|
||||
ExamMapper.map_to_exam_model,
|
||||
str(self._level_json_schema())
|
||||
)
|
||||
|
||||
def _gpt_instructions_batches(self):
|
||||
return {
|
||||
"role": "system",
|
||||
"content": (
|
||||
'You are helpfull assistant. Your task is to merge multiple batches of english question sheet '
|
||||
'components and solve the questions. Each batch may contain overlapping content with the previous '
|
||||
'batch, or close enough content which needs to be excluded. The components are as follows:'
|
||||
|
||||
'- Part, a standalone part or part of a section of the question sheet: '
|
||||
'{"type": "part", "part": "<name or number of the part>"}\n'
|
||||
|
||||
'- Multiple Choice Question, there are three types of multiple choice questions that differ on '
|
||||
'the prompt field of the template: blanks, underlines and normal. '
|
||||
|
||||
'In a blanks question, the prompt has underscores to represent the blank space, you must select the '
|
||||
'appropriate option to solve it.'
|
||||
|
||||
'In a underlines question, the prompt has 4 underlines represented by the html tags <u></u>, you must '
|
||||
'select the option that makes the prompt incorrect to solve it. If the options order doesn\'t reflect '
|
||||
'the order in which the underlines appear in the prompt you will need to fix it.'
|
||||
|
||||
'In a normal question there isn\'t either blanks or underlines in the prompt, you should just '
|
||||
'select the appropriate solution.'
|
||||
|
||||
f'The template for these questions is the same: {self._multiple_choice_png()}\n'
|
||||
|
||||
'- Reading Passages, there are two types of reading passages with different templates. The one with '
|
||||
'type "blanksPassage" where the text field holds the passage and a blank is represented by '
|
||||
'{{<some number>}} and the other one with type "passage" that has the context field with just '
|
||||
'reading passages. For both of these components you will have to remove any additional data that might '
|
||||
'be related to a question description and also remove some "(<question id>)" and "_" from blanksPassage'
|
||||
' if there are any. These components are used in conjunction with other ones.'
|
||||
|
||||
'- Blanks Options, options for a blanks reading passage exercise, this type of component is a group of '
|
||||
'options with the question id and the options from a to d. The template is: '
|
||||
f'{self._passage_blank_space_png()}\n\n'
|
||||
|
||||
'Now that you know the possible components here\'s what I want you to do:\n'
|
||||
'1. Remove duplicates. A batch will have duplicates of other batches and the components of '
|
||||
'the next batch should always take precedence over the previous one batch, what I mean by this is that '
|
||||
'if batch 1 has, for example, multiple choice question with id 10 and the next one also has id 10, '
|
||||
'you pick the next one.\n'
|
||||
'2. Solve the exercises. There are 4 types of exercises, the 3 multipleChoice variants + a fill blanks '
|
||||
'exercise. For the multiple choice question follow the previous instruction to solve them and place '
|
||||
f'them in this format: {self._multiple_choice_html()}. For the fill blanks exercises you need to match '
|
||||
'the correct blanksPassage to the correct fillBlanks options and then pick the correct option. Here is '
|
||||
f'the template for this exercise: {self._passage_blank_space_html()}.\n'
|
||||
f'3. Restructure the JSON to match this template: {self._level_json_schema()}. You must group the exercises by '
|
||||
'the parts in the order they appear in the batches components. The context field of a part is the '
|
||||
'context of a passage component that has text relevant to normal multiple choice questions.\n'
|
||||
|
||||
'Do your utmost to fullfill the requisites, make sure you include all non-duplicate questions'
|
||||
'in your response and correctly structure the JSON.'
|
||||
)
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def fix_ids(response):
|
||||
counter = 1
|
||||
for part in response["parts"]:
|
||||
for exercise in part["exercises"]:
|
||||
if exercise["type"] == "multipleChoice":
|
||||
for question in exercise["questions"]:
|
||||
question["id"] = counter
|
||||
counter += 1
|
||||
if exercise["type"] == "fillBlanks":
|
||||
for i in range(len(exercise["words"])):
|
||||
exercise["words"][i]["id"] = counter
|
||||
exercise["solutions"][i]["id"] = counter
|
||||
counter += 1
|
||||
return response
|
||||
29
modules/upload_level/sheet_dtos.py
Normal file
29
modules/upload_level/sheet_dtos.py
Normal file
@@ -0,0 +1,29 @@
|
||||
from pydantic import BaseModel
|
||||
from typing import List, Dict, Union, Any, Optional
|
||||
|
||||
|
||||
class Option(BaseModel):
|
||||
id: str
|
||||
text: str
|
||||
|
||||
|
||||
class MultipleChoiceQuestion(BaseModel):
|
||||
type: str = "multipleChoice"
|
||||
id: str
|
||||
prompt: str
|
||||
variant: str = "text"
|
||||
options: List[Option]
|
||||
|
||||
|
||||
class FillBlanksWord(BaseModel):
|
||||
type: str = "fillBlanks"
|
||||
id: str
|
||||
options: Dict[str, str]
|
||||
|
||||
|
||||
Component = Union[MultipleChoiceQuestion, FillBlanksWord, Dict[str, Any]]
|
||||
|
||||
|
||||
class Sheet(BaseModel):
|
||||
batch: Optional[int] = None
|
||||
components: List[Component]
|
||||
BIN
requirements.txt
BIN
requirements.txt
Binary file not shown.
1
tmp/placeholder.txt
Normal file
1
tmp/placeholder.txt
Normal file
@@ -0,0 +1 @@
|
||||
THIS FILE ONLY EXISTS TO KEEP THIS FOLDER IN THE REPO
|
||||
Reference in New Issue
Block a user