Compare commits
90 Commits
refactor-t
...
release/mo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
676f660f3e | ||
|
|
6cb7c07f57 | ||
|
|
a328f01d2e | ||
|
|
a931c5ec2e | ||
|
|
bfc9565e85 | ||
|
|
3d70bcbfd1 | ||
|
|
a2cfa335d7 | ||
|
|
0427d6e1b4 | ||
|
|
31c6ed570a | ||
|
|
3a27c42a69 | ||
|
|
260dba1ee6 | ||
|
|
a88d6bb568 | ||
|
|
f0f904f2e4 | ||
|
|
a23bbe581a | ||
|
|
bb26282d25 | ||
|
|
73c29cda25 | ||
|
|
aaa3361575 | ||
|
|
94a16b636d | ||
|
|
cffec795a7 | ||
|
|
b2b4dfb74e | ||
|
|
2716f52a0a | ||
|
|
4099d99f80 | ||
|
|
ab4db36445 | ||
|
|
59f047afba | ||
|
|
09b57cb346 | ||
|
|
bfc3e3f083 | ||
|
|
7b5e10fd79 | ||
|
|
a2a160f61b | ||
|
|
5d5cd21e1e | ||
|
|
06a8384f42 | ||
|
|
dd74a3d259 | ||
|
|
efff0b904e | ||
|
|
cf7a966141 | ||
|
|
03f5b7d72c | ||
|
|
d68617f33b | ||
|
|
eeaa04f856 | ||
|
|
beccf8b501 | ||
|
|
470f4cc83b | ||
|
|
3ad411ed71 | ||
|
|
7144a3f3ca | ||
|
|
b795a3fb79 | ||
|
|
034be25e8e | ||
|
|
a931f06c47 | ||
|
|
8e56a3228b | ||
|
|
14c5914420 | ||
|
|
6878e0a276 | ||
|
|
1f29ac6ee5 | ||
|
|
a1ee7e47da | ||
|
|
adfc027458 | ||
|
|
3a7bb7764f | ||
|
|
19f204d74d | ||
|
|
88ba9ab561 | ||
|
|
34afb5d1e8 | ||
|
|
eb904f836a | ||
|
|
ca12ad1161 | ||
|
|
8b8460517c | ||
|
|
9be9bfce0e | ||
|
|
4776f24229 | ||
|
|
bf9251eebb | ||
|
|
1ecda04c6b | ||
|
|
d5621c1793 | ||
|
|
4c41942dfe | ||
|
|
bef606fe14 | ||
|
|
358f240d16 | ||
|
|
e7d84b9704 | ||
|
|
b4dc6be927 | ||
|
|
afca610c09 | ||
|
|
495502bc93 | ||
|
|
565874ad41 | ||
|
|
e693f5ee2a | ||
|
|
a8b46160d4 | ||
|
|
640039d372 | ||
|
|
a3cd1cdf59 | ||
|
|
9a696bbeb5 | ||
|
|
2adb7d1847 | ||
|
|
b93ead3a7b | ||
|
|
ad3a32ce45 | ||
|
|
ee5f23b3d7 | ||
|
|
545aee1a19 | ||
|
|
3f749f1ff5 | ||
|
|
32ac2149f5 | ||
|
|
64cc207fe8 | ||
|
|
a4caecdb4f | ||
|
|
20dfd5be78 | ||
|
|
1d110d5fa9 | ||
|
|
7633822916 | ||
|
|
9bc06d8340 | ||
|
|
4ff3b02a1d | ||
|
|
7637322239 | ||
|
|
3676d7ad39 |
@@ -5,3 +5,4 @@ README.md
|
||||
*.pyd
|
||||
__pycache__
|
||||
.pytest_cache
|
||||
/scripts
|
||||
|
||||
5
.env
5
.env
@@ -1,5 +0,0 @@
|
||||
OPENAI_API_KEY=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
|
||||
JWT_SECRET_KEY=6e9c124ba92e8814719dcb0f21200c8aa4d0f119a994ac5e06eb90a366c83ab2
|
||||
JWT_TEST_TOKEN=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ0ZXN0In0.Emrs2D3BmMP4b3zMjw0fJTPeyMwWEBDbxx2vvaWguO0
|
||||
GOOGLE_APPLICATION_CREDENTIALS=firebase-configs/storied-phalanx-349916.json
|
||||
HEY_GEN_TOKEN=MjY4MDE0MjdjZmNhNDFmYTlhZGRkNmI3MGFlMzYwZDItMTY5NTExNzY3MA==
|
||||
2
.gitignore
vendored
2
.gitignore
vendored
@@ -2,3 +2,5 @@ __pycache__
|
||||
.idea
|
||||
.env
|
||||
.DS_Store
|
||||
/firebase-configs/test_firebase.json
|
||||
/scripts
|
||||
|
||||
8
.idea/.gitignore
generated
vendored
8
.idea/.gitignore
generated
vendored
@@ -1,8 +0,0 @@
|
||||
# Default ignored files
|
||||
/shelf/
|
||||
/workspace.xml
|
||||
# Editor-based HTTP Client requests
|
||||
/httpRequests/
|
||||
# Datasource local storage ignored files
|
||||
/dataSources/
|
||||
/dataSources.local.xml
|
||||
17
.idea/ielts-be.iml
generated
17
.idea/ielts-be.iml
generated
@@ -1,24 +1,17 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<module type="PYTHON_MODULE" version="4">
|
||||
<component name="Flask">
|
||||
<option name="enabled" value="true" />
|
||||
</component>
|
||||
<component name="NewModuleRootManager">
|
||||
<content url="file://$MODULE_DIR$">
|
||||
<excludeFolder url="file://$MODULE_DIR$/venv" />
|
||||
<excludeFolder url="file://$MODULE_DIR$/.venv" />
|
||||
</content>
|
||||
<orderEntry type="jdk" jdkName="Python 3.9" jdkType="Python SDK" />
|
||||
<orderEntry type="jdk" jdkName="Python 3.11 (ielts-be)" jdkType="Python SDK" />
|
||||
<orderEntry type="sourceFolder" forTests="false" />
|
||||
</component>
|
||||
<component name="PackageRequirementsSettings">
|
||||
<option name="versionSpecifier" value="Don't specify version" />
|
||||
</component>
|
||||
<component name="TemplatesService">
|
||||
<option name="TEMPLATE_CONFIGURATION" value="Jinja2" />
|
||||
<option name="TEMPLATE_FOLDERS">
|
||||
<list>
|
||||
<option value="$MODULE_DIR$/../flaskProject\templates" />
|
||||
</list>
|
||||
</option>
|
||||
<component name="PyDocumentationSettings">
|
||||
<option name="format" value="GOOGLE" />
|
||||
<option name="myDocStringFormat" value="Google" />
|
||||
</component>
|
||||
</module>
|
||||
8
.idea/misc.xml
generated
8
.idea/misc.xml
generated
@@ -1,4 +1,10 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project version="4">
|
||||
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.9" project-jdk-type="Python SDK" />
|
||||
<component name="Black">
|
||||
<option name="sdkName" value="Python 3.11 (ielts-be)" />
|
||||
</component>
|
||||
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.11 (ielts-be)" project-jdk-type="Python SDK" />
|
||||
<component name="PyCharmProfessionalAdvertiser">
|
||||
<option name="shown" value="true" />
|
||||
</component>
|
||||
</project>
|
||||
2
.idea/vcs.xml
generated
2
.idea/vcs.xml
generated
@@ -1,6 +1,6 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project version="4">
|
||||
<component name="VcsDirectoryMappings">
|
||||
<mapping directory="$PROJECT_DIR$" vcs="Git" />
|
||||
<mapping directory="" vcs="Git" />
|
||||
</component>
|
||||
</project>
|
||||
18
Dockerfile
18
Dockerfile
@@ -11,7 +11,23 @@ ENV APP_HOME /app
|
||||
WORKDIR $APP_HOME
|
||||
COPY . ./
|
||||
|
||||
RUN apt update && apt install -y ffmpeg
|
||||
RUN apt update && apt install -y \
|
||||
ffmpeg \
|
||||
poppler-utils \
|
||||
texlive-latex-base \
|
||||
texlive-fonts-recommended \
|
||||
texlive-latex-extra \
|
||||
texlive-xetex \
|
||||
pandoc \
|
||||
librsvg2-bin \
|
||||
curl \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
|
||||
RUN curl -sL https://deb.nodesource.com/setup_20.x | bash - \
|
||||
&& apt-get install -y nodejs
|
||||
|
||||
RUN npm install -g firebase-tools
|
||||
|
||||
# Install production dependencies.
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
|
||||
BIN
faiss/ct_focus_tips_index.faiss
Normal file
BIN
faiss/ct_focus_tips_index.faiss
Normal file
Binary file not shown.
BIN
faiss/language_for_writing_tips_index.faiss
Normal file
BIN
faiss/language_for_writing_tips_index.faiss
Normal file
Binary file not shown.
BIN
faiss/reading_skill_tips_index.faiss
Normal file
BIN
faiss/reading_skill_tips_index.faiss
Normal file
Binary file not shown.
BIN
faiss/strategy_tips_index.faiss
Normal file
BIN
faiss/strategy_tips_index.faiss
Normal file
Binary file not shown.
BIN
faiss/tips_metadata.pkl
Normal file
BIN
faiss/tips_metadata.pkl
Normal file
Binary file not shown.
BIN
faiss/word_link_tips_index.faiss
Normal file
BIN
faiss/word_link_tips_index.faiss
Normal file
Binary file not shown.
BIN
faiss/word_partners_tips_index.faiss
Normal file
BIN
faiss/word_partners_tips_index.faiss
Normal file
Binary file not shown.
BIN
faiss/writing_skill_tips_index.faiss
Normal file
BIN
faiss/writing_skill_tips_index.faiss
Normal file
Binary file not shown.
@@ -18,7 +18,13 @@ GEN_FIELDS = ['topic']
|
||||
GEN_TEXT_FIELDS = ['title']
|
||||
LISTENING_GEN_FIELDS = ['transcript', 'exercise']
|
||||
READING_EXERCISE_TYPES = ['fillBlanks', 'writeBlanks', 'trueFalse', 'paragraphMatch']
|
||||
READING_3_EXERCISE_TYPES = ['fillBlanks', 'writeBlanks', 'trueFalse', 'paragraphMatch', 'ideaMatch']
|
||||
LISTENING_EXERCISE_TYPES = ['multipleChoice', 'writeBlanksQuestions', 'writeBlanksFill', 'writeBlanksForm']
|
||||
LISTENING_1_EXERCISE_TYPES = ['multipleChoice', 'writeBlanksQuestions', 'writeBlanksFill', 'writeBlanksFill',
|
||||
'writeBlanksForm', 'writeBlanksForm', 'writeBlanksForm', 'writeBlanksForm']
|
||||
LISTENING_2_EXERCISE_TYPES = ['multipleChoice', 'writeBlanksQuestions']
|
||||
LISTENING_3_EXERCISE_TYPES = ['multipleChoice3Options', 'writeBlanksQuestions']
|
||||
LISTENING_4_EXERCISE_TYPES = ['multipleChoice', 'writeBlanksQuestions', 'writeBlanksFill', 'writeBlanksForm']
|
||||
|
||||
TOTAL_READING_PASSAGE_1_EXERCISES = 13
|
||||
TOTAL_READING_PASSAGE_2_EXERCISES = 13
|
||||
@@ -35,7 +41,7 @@ SPEAKING_MIN_TIMER_DEFAULT = 14
|
||||
|
||||
BLACKLISTED_WORDS = ["jesus", "sex", "gay", "lesbian", "homosexual", "god", "angel", "pornography", "beer", "wine",
|
||||
"cocaine", "alcohol", "nudity", "lgbt", "casino", "gambling", "catholicism",
|
||||
"discrimination", "politics", "politic", "christianity", "islam", "christian", "christians",
|
||||
"discrimination", "politic", "christianity", "islam", "christian", "christians",
|
||||
"jews", "jew", "discrimination", "discriminatory"]
|
||||
|
||||
EN_US_VOICES = [
|
||||
@@ -141,7 +147,6 @@ mti_topics = [
|
||||
"Poverty Alleviation",
|
||||
"Cybersecurity and Privacy",
|
||||
"Human Rights",
|
||||
"Social Justice",
|
||||
"Food and Agriculture",
|
||||
"Cyberbullying and Online Safety",
|
||||
"Linguistic Diversity",
|
||||
@@ -169,7 +174,6 @@ topics = [
|
||||
"Space Exploration",
|
||||
"Artificial Intelligence",
|
||||
"Climate Change",
|
||||
"World Religions",
|
||||
"The Human Brain",
|
||||
"Renewable Energy",
|
||||
"Cultural Diversity",
|
||||
@@ -232,7 +236,6 @@ topics = [
|
||||
"Meditation Practices",
|
||||
"Literary Symbolism",
|
||||
"Marine Conservation",
|
||||
"Social Justice Movements",
|
||||
"Sustainable Tourism",
|
||||
"Ancient Philosophy",
|
||||
"Cold War Era",
|
||||
|
||||
1133
helper/exercises.py
1133
helper/exercises.py
File diff suppressed because it is too large
Load Diff
@@ -1,7 +1,7 @@
|
||||
import logging
|
||||
|
||||
from firebase_admin import firestore
|
||||
from google.cloud import storage
|
||||
from pymongo.database import Database
|
||||
|
||||
|
||||
def download_firebase_file(bucket_name, source_blob_name, destination_file_name):
|
||||
@@ -50,38 +50,16 @@ def upload_file_firebase_get_url(bucket_name, destination_blob_name, source_file
|
||||
return None
|
||||
|
||||
|
||||
def save_to_db(collection: str, item):
|
||||
db = firestore.client()
|
||||
collection_ref = db.collection(collection)
|
||||
(update_time, document_ref) = collection_ref.add(item)
|
||||
def save_to_db_with_id(mongo_db: Database, collection: str, item, id: str):
|
||||
collection_ref = mongo_db[collection]
|
||||
|
||||
document_ref = collection_ref.insert_one({"id": id, **item})
|
||||
if document_ref:
|
||||
logging.info(f"Document added with ID: {document_ref.id}")
|
||||
return (True, document_ref.id)
|
||||
logging.info(f"Document added with ID: {document_ref.inserted_id}")
|
||||
return (True, document_ref.inserted_id)
|
||||
else:
|
||||
return (False, None)
|
||||
|
||||
|
||||
def save_to_db_with_id(collection: str, item, id: str):
|
||||
db = firestore.client()
|
||||
collection_ref = db.collection(collection)
|
||||
# Reference to the specific document with the desired ID
|
||||
document_ref = collection_ref.document(id)
|
||||
# Set the data to the document
|
||||
document_ref.set(item)
|
||||
if document_ref:
|
||||
logging.info(f"Document added with ID: {document_ref.id}")
|
||||
return (True, document_ref.id)
|
||||
else:
|
||||
return (False, None)
|
||||
|
||||
|
||||
def get_all(collection: str):
|
||||
db = firestore.client()
|
||||
collection_ref = db.collection(collection)
|
||||
|
||||
all_exercises = (
|
||||
collection_ref
|
||||
.get()
|
||||
)
|
||||
|
||||
return all_exercises
|
||||
def get_all(mongo_db: Database, collection: str):
|
||||
return list(mongo_db[collection].find())
|
||||
|
||||
50
helper/gpt_zero.py
Normal file
50
helper/gpt_zero.py
Normal file
@@ -0,0 +1,50 @@
|
||||
from logging import getLogger
|
||||
from typing import Dict, Optional
|
||||
import requests
|
||||
|
||||
|
||||
class GPTZero:
|
||||
_GPT_ZERO_ENDPOINT = 'https://api.gptzero.me/v2/predict/text'
|
||||
|
||||
def __init__(self, gpt_zero_key: str):
|
||||
self._logger = getLogger(__name__)
|
||||
if gpt_zero_key is None:
|
||||
self._logger.warning('GPT Zero key was not included! Skipping ai detection when grading.')
|
||||
self._gpt_zero_key = gpt_zero_key
|
||||
self._header = {
|
||||
'x-api-key': gpt_zero_key
|
||||
}
|
||||
|
||||
def run_detection(self, text: str):
|
||||
if self._gpt_zero_key is None:
|
||||
return None
|
||||
data = {
|
||||
'document': text,
|
||||
'version': '',
|
||||
'multilingual': False
|
||||
}
|
||||
response = requests.post(self._GPT_ZERO_ENDPOINT, headers=self._header, json=data)
|
||||
if response.status_code != 200:
|
||||
self._logger.error(f'GPT\'s Zero Endpoint returned with {response.status_code}: {response.json()}')
|
||||
return None
|
||||
return self._parse_detection(response.json())
|
||||
|
||||
def _parse_detection(self, response: Dict) -> Optional[Dict]:
|
||||
try:
|
||||
text_scan = response["documents"][0]
|
||||
filtered_sentences = [
|
||||
{
|
||||
"sentence": item["sentence"],
|
||||
"highlight_sentence_for_ai": item["highlight_sentence_for_ai"]
|
||||
}
|
||||
for item in text_scan["sentences"]
|
||||
]
|
||||
return {
|
||||
"class_probabilities": text_scan["class_probabilities"],
|
||||
"confidence_category": text_scan["confidence_category"],
|
||||
"predicted_class": text_scan["predicted_class"],
|
||||
"sentences": filtered_sentences
|
||||
}
|
||||
except Exception as e:
|
||||
self._logger.error(f'Failed to parse GPT\'s Zero response: {str(e)}')
|
||||
return None
|
||||
@@ -1,17 +1,19 @@
|
||||
import os
|
||||
import random
|
||||
import time
|
||||
from logging import getLogger
|
||||
|
||||
import requests
|
||||
from dotenv import load_dotenv
|
||||
|
||||
import app
|
||||
from helper.constants import *
|
||||
from helper.firebase_helper import upload_file_firebase_get_url, save_to_db_with_id
|
||||
from heygen.AvatarEnum import AvatarEnum
|
||||
|
||||
load_dotenv()
|
||||
|
||||
logger = getLogger(__name__)
|
||||
|
||||
# Get HeyGen token
|
||||
TOKEN = os.getenv("HEY_GEN_TOKEN")
|
||||
FIREBASE_BUCKET = os.getenv('FIREBASE_BUCKET')
|
||||
@@ -29,26 +31,32 @@ GET_HEADER = {
|
||||
|
||||
|
||||
def create_videos_and_save_to_db(exercises, template, id):
|
||||
avatar = random.choice(list(AvatarEnum))
|
||||
# Speaking 1
|
||||
# Using list comprehension to find the element with the desired value in the 'type' field
|
||||
found_exercises_1 = [element for element in exercises if element.get('type') == 1]
|
||||
# Check if any elements were found
|
||||
if found_exercises_1:
|
||||
exercise_1 = found_exercises_1[0]
|
||||
app.app.logger.info('Creating video for speaking part 1')
|
||||
sp1_result = create_video(exercise_1["question"], random.choice(list(AvatarEnum)))
|
||||
sp1_questions = []
|
||||
logger.info('Creating video for speaking part 1')
|
||||
for question in exercise_1["questions"]:
|
||||
sp1_result = create_video(question, avatar)
|
||||
if sp1_result is not None:
|
||||
sound_file_path = VIDEO_FILES_PATH + sp1_result
|
||||
firebase_file_path = FIREBASE_SPEAKING_VIDEO_FILES_PATH + sp1_result
|
||||
url = upload_file_firebase_get_url(FIREBASE_BUCKET, firebase_file_path, sound_file_path)
|
||||
sp1_video_path = firebase_file_path
|
||||
sp1_video_url = url
|
||||
template["exercises"][0]["text"] = exercise_1["question"]
|
||||
template["exercises"][0]["title"] = exercise_1["topic"]
|
||||
template["exercises"][0]["video_url"] = sp1_video_url
|
||||
template["exercises"][0]["video_path"] = sp1_video_path
|
||||
video = {
|
||||
"text": question,
|
||||
"video_path": firebase_file_path,
|
||||
"video_url": url
|
||||
}
|
||||
sp1_questions.append(video)
|
||||
else:
|
||||
app.app.logger.error("Failed to create video for part 1 question: " + exercise_1["question"])
|
||||
logger.error("Failed to create video for part 1 question: " + exercise_1["question"])
|
||||
template["exercises"][0]["prompts"] = sp1_questions
|
||||
template["exercises"][0]["first_title"] = exercise_1["first_topic"]
|
||||
template["exercises"][0]["second_title"] = exercise_1["second_topic"]
|
||||
|
||||
# Speaking 2
|
||||
# Using list comprehension to find the element with the desired value in the 'type' field
|
||||
@@ -56,8 +64,8 @@ def create_videos_and_save_to_db(exercises, template, id):
|
||||
# Check if any elements were found
|
||||
if found_exercises_2:
|
||||
exercise_2 = found_exercises_2[0]
|
||||
app.app.logger.info('Creating video for speaking part 2')
|
||||
sp2_result = create_video(exercise_2["question"], random.choice(list(AvatarEnum)))
|
||||
logger.info('Creating video for speaking part 2')
|
||||
sp2_result = create_video(exercise_2["question"], avatar)
|
||||
if sp2_result is not None:
|
||||
sound_file_path = VIDEO_FILES_PATH + sp2_result
|
||||
firebase_file_path = FIREBASE_SPEAKING_VIDEO_FILES_PATH + sp2_result
|
||||
@@ -70,7 +78,7 @@ def create_videos_and_save_to_db(exercises, template, id):
|
||||
template["exercises"][1]["video_url"] = sp2_video_url
|
||||
template["exercises"][1]["video_path"] = sp2_video_path
|
||||
else:
|
||||
app.app.logger.error("Failed to create video for part 2 question: " + exercise_2["question"])
|
||||
logger.error("Failed to create video for part 2 question: " + exercise_2["question"])
|
||||
|
||||
# Speaking 3
|
||||
# Using list comprehension to find the element with the desired value in the 'type' field
|
||||
@@ -79,8 +87,7 @@ def create_videos_and_save_to_db(exercises, template, id):
|
||||
if found_exercises_3:
|
||||
exercise_3 = found_exercises_3[0]
|
||||
sp3_questions = []
|
||||
avatar = random.choice(list(AvatarEnum))
|
||||
app.app.logger.info('Creating videos for speaking part 3')
|
||||
logger.info('Creating videos for speaking part 3')
|
||||
for question in exercise_3["questions"]:
|
||||
result = create_video(question, avatar)
|
||||
if result is not None:
|
||||
@@ -94,7 +101,7 @@ def create_videos_and_save_to_db(exercises, template, id):
|
||||
}
|
||||
sp3_questions.append(video)
|
||||
else:
|
||||
app.app.logger.error("Failed to create video for part 3 question: " + question)
|
||||
logger.error("Failed to create video for part 3 question: " + question)
|
||||
template["exercises"][2]["prompts"] = sp3_questions
|
||||
template["exercises"][2]["title"] = exercise_3["topic"]
|
||||
|
||||
@@ -106,7 +113,7 @@ def create_videos_and_save_to_db(exercises, template, id):
|
||||
template["exercises"].pop(0)
|
||||
|
||||
save_to_db_with_id("speaking", template, id)
|
||||
app.app.logger.info('Saved speaking to DB with id ' + id + " : " + str(template))
|
||||
logger.info('Saved speaking to DB with id ' + id + " : " + str(template))
|
||||
|
||||
|
||||
def create_video(text, avatar):
|
||||
@@ -127,8 +134,8 @@ def create_video(text, avatar):
|
||||
}
|
||||
}
|
||||
response = requests.post(create_video_url, headers=POST_HEADER, json=data)
|
||||
app.app.logger.info(response.status_code)
|
||||
app.app.logger.info(response.json())
|
||||
logger.info(response.status_code)
|
||||
logger.info(response.json())
|
||||
|
||||
# GET TO CHECK STATUS AND GET VIDEO WHEN READY
|
||||
video_id = response.json()["data"]["video_id"]
|
||||
@@ -147,11 +154,11 @@ def create_video(text, avatar):
|
||||
error = response_data["data"]["error"]
|
||||
|
||||
if status != "completed" and error is None:
|
||||
app.app.logger.info(f"Status: {status}")
|
||||
logger.info(f"Status: {status}")
|
||||
time.sleep(10) # Wait for 10 second before the next request
|
||||
|
||||
app.app.logger.info(response.status_code)
|
||||
app.app.logger.info(response.json())
|
||||
logger.info(response.status_code)
|
||||
logger.info(response.json())
|
||||
|
||||
# DOWNLOAD VIDEO
|
||||
download_url = response.json()['data']['video_url']
|
||||
@@ -165,8 +172,8 @@ def create_video(text, avatar):
|
||||
output_path = os.path.join(output_directory, output_filename)
|
||||
with open(output_path, 'wb') as f:
|
||||
f.write(response.content)
|
||||
app.app.logger.info(f"File '{output_filename}' downloaded successfully.")
|
||||
logger.info(f"File '{output_filename}' downloaded successfully.")
|
||||
return output_filename
|
||||
else:
|
||||
app.app.logger.error(f"Failed to download file. Status code: {response.status_code}")
|
||||
logger.error(f"Failed to download file. Status code: {response.status_code}")
|
||||
return None
|
||||
|
||||
@@ -2,8 +2,8 @@ import json
|
||||
import os
|
||||
import re
|
||||
|
||||
from openai import OpenAI
|
||||
from dotenv import load_dotenv
|
||||
from openai import OpenAI
|
||||
|
||||
from helper.constants import BLACKLISTED_WORDS, GPT_3_5_TURBO
|
||||
from helper.token_counter import count_tokens
|
||||
@@ -54,7 +54,7 @@ def check_fields(obj, fields):
|
||||
return all(field in obj for field in fields)
|
||||
|
||||
|
||||
def make_openai_call(model, messages, token_count, fields_to_check, temperature):
|
||||
def make_openai_call(model, messages, token_count, fields_to_check, temperature, check_blacklisted=True):
|
||||
global try_count
|
||||
result = client.chat.completions.create(
|
||||
model=model,
|
||||
@@ -65,6 +65,7 @@ def make_openai_call(model, messages, token_count, fields_to_check, temperature)
|
||||
)
|
||||
result = result.choices[0].message.content
|
||||
|
||||
if check_blacklisted:
|
||||
found_blacklisted_word = get_found_blacklisted_words(result)
|
||||
|
||||
if found_blacklisted_word is not None and try_count < TRY_LIMIT:
|
||||
@@ -188,7 +189,7 @@ def get_fixed_text(text):
|
||||
}
|
||||
]
|
||||
token_count = count_total_tokens(messages)
|
||||
response = make_openai_call(GPT_3_5_TURBO, messages, token_count, ["fixed_text"], 0.2)
|
||||
response = make_openai_call(GPT_3_5_TURBO, messages, token_count, ["fixed_text"], 0.2, False)
|
||||
return response["fixed_text"]
|
||||
|
||||
|
||||
@@ -203,7 +204,7 @@ def get_speaking_corrections(text):
|
||||
}
|
||||
]
|
||||
token_count = count_total_tokens(messages)
|
||||
response = make_openai_call(GPT_3_5_TURBO, messages, token_count, ["fixed_text"], 0.2)
|
||||
response = make_openai_call(GPT_3_5_TURBO, messages, token_count, ["fixed_text"], 0.2, False)
|
||||
return response["fixed_text"]
|
||||
|
||||
|
||||
@@ -211,6 +212,7 @@ def has_blacklisted_words(text: str):
|
||||
text_lower = text.lower()
|
||||
return any(word in text_lower for word in BLACKLISTED_WORDS)
|
||||
|
||||
|
||||
def get_found_blacklisted_words(text: str):
|
||||
text_lower = text.lower()
|
||||
for word in BLACKLISTED_WORDS:
|
||||
@@ -218,6 +220,7 @@ def get_found_blacklisted_words(text: str):
|
||||
return word
|
||||
return None
|
||||
|
||||
|
||||
def remove_special_characters_from_beginning(string):
|
||||
cleaned_string = string.lstrip('\n')
|
||||
if string.startswith("'") or string.startswith('"'):
|
||||
@@ -239,6 +242,7 @@ def replace_expression_in_object(obj, expression, replacement):
|
||||
obj[key] = replace_expression_in_object(obj[key], expression, replacement)
|
||||
return obj
|
||||
|
||||
|
||||
def count_total_tokens(messages):
|
||||
total_tokens = 0
|
||||
for message in messages:
|
||||
|
||||
@@ -1136,12 +1136,11 @@ def getSpeakingTemplate():
|
||||
"exercises": [
|
||||
{
|
||||
"id": str(uuid.uuid4()),
|
||||
"prompts": [],
|
||||
"text": "text",
|
||||
"title": "topic",
|
||||
"video_url": "sp1_video_url",
|
||||
"video_path": "sp1_video_path",
|
||||
"type": "speaking"
|
||||
"prompts": ["questions"],
|
||||
"text": "Listen carefully and respond.",
|
||||
"first_title": "first_topic",
|
||||
"second_title": "second_topic",
|
||||
"type": "interactiveSpeaking"
|
||||
},
|
||||
{
|
||||
"id": str(uuid.uuid4()),
|
||||
|
||||
@@ -95,17 +95,26 @@ def conversation_text_to_speech(conversation: list, file_name: str):
|
||||
|
||||
|
||||
def has_words(text: str):
|
||||
if not has_common_words(text):
|
||||
return False
|
||||
english_words = set(words.words())
|
||||
words_in_input = text.split()
|
||||
return any(word.lower() in english_words for word in words_in_input)
|
||||
|
||||
|
||||
def has_x_words(text: str, quantity):
|
||||
if not has_common_words(text):
|
||||
return False
|
||||
english_words = set(words.words())
|
||||
words_in_input = text.split()
|
||||
english_word_count = sum(1 for word in words_in_input if word.lower() in english_words)
|
||||
return english_word_count >= quantity
|
||||
|
||||
def has_common_words(text: str):
|
||||
english_words = {"the", "be", "to", "of", "and", "a", "in", "that", "have", "i"}
|
||||
words_in_input = text.split()
|
||||
english_word_count = sum(1 for word in words_in_input if word.lower() in english_words)
|
||||
return english_word_count >= 10
|
||||
|
||||
def divide_text(text, max_length=3000):
|
||||
if len(text) <= max_length:
|
||||
|
||||
5
modules/__init__.py
Normal file
5
modules/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
from .gpt import GPT
|
||||
|
||||
__all__ = [
|
||||
"GPT"
|
||||
]
|
||||
5
modules/batch_users/__init__.py
Normal file
5
modules/batch_users/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
from .service import BatchUsers
|
||||
|
||||
__all__ = [
|
||||
"BatchUsers"
|
||||
]
|
||||
31
modules/batch_users/batch_users.py
Normal file
31
modules/batch_users/batch_users.py
Normal file
@@ -0,0 +1,31 @@
|
||||
import uuid
|
||||
from typing import Optional
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
class DemographicInfo(BaseModel):
|
||||
phone: str
|
||||
passport_id: Optional[str] = None
|
||||
country: Optional[str] = None
|
||||
|
||||
|
||||
class UserDTO(BaseModel):
|
||||
id: uuid.UUID = Field(default_factory=uuid.uuid4)
|
||||
email: str
|
||||
name: str
|
||||
type: str
|
||||
passport_id: str
|
||||
passwordHash: str
|
||||
passwordSalt: str
|
||||
groupName: Optional[str] = None
|
||||
corporate: Optional[str] = None
|
||||
studentID: Optional[str] = None
|
||||
expiryDate: Optional[str] = None
|
||||
demographicInformation: Optional[DemographicInfo] = None
|
||||
|
||||
|
||||
class BatchUsersDTO(BaseModel):
|
||||
makerID: str
|
||||
users: list[UserDTO]
|
||||
269
modules/batch_users/service.py
Normal file
269
modules/batch_users/service.py
Normal file
@@ -0,0 +1,269 @@
|
||||
import os
|
||||
import subprocess
|
||||
import time
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
from logging import getLogger
|
||||
|
||||
import pandas as pd
|
||||
from typing import Dict
|
||||
|
||||
import shortuuid
|
||||
from pymongo.database import Database
|
||||
|
||||
from modules.batch_users.batch_users import BatchUsersDTO, UserDTO
|
||||
from modules.helper.file_helper import FileHelper
|
||||
|
||||
|
||||
class BatchUsers:
|
||||
|
||||
_DEFAULT_DESIRED_LEVELS = {
|
||||
"reading": 9,
|
||||
"listening": 9,
|
||||
"writing": 9,
|
||||
"speaking": 9,
|
||||
}
|
||||
|
||||
_DEFAULT_LEVELS = {
|
||||
"reading": 0,
|
||||
"listening": 0,
|
||||
"writing": 0,
|
||||
"speaking": 0,
|
||||
}
|
||||
|
||||
def __init__(self, mongo: Database):
|
||||
self._db: Database = mongo
|
||||
self._logger = getLogger(__name__)
|
||||
|
||||
def batch_users(self, request_data: Dict):
|
||||
batch_dto = self._map_to_batch(request_data)
|
||||
|
||||
file_name = f'{uuid.uuid4()}.csv'
|
||||
path = f'./tmp/{file_name}'
|
||||
self._generate_firebase_auth_csv(batch_dto, path)
|
||||
|
||||
result = self._upload_users('./tmp', file_name)
|
||||
if result.returncode != 0:
|
||||
error_msg = f"Couldn't upload users. Failed to run command firebase auth import -> ```cmd {result.stderr}```"
|
||||
self._logger.error(error_msg)
|
||||
return error_msg
|
||||
|
||||
self._init_users(batch_dto)
|
||||
|
||||
FileHelper.remove_file(path)
|
||||
return {"ok": True}
|
||||
|
||||
@staticmethod
|
||||
def _map_to_batch(request_data: Dict) -> BatchUsersDTO:
|
||||
users: list[UserDTO] = [UserDTO(**user) for user in request_data["users"]]
|
||||
return BatchUsersDTO(makerID=request_data["makerID"], users=users)
|
||||
|
||||
@staticmethod
|
||||
def _generate_firebase_auth_csv(batch_dto: BatchUsersDTO, path: str):
|
||||
# https://firebase.google.com/docs/cli/auth#file_format
|
||||
columns = [
|
||||
'UID', 'Email', 'Email Verified', 'Password Hash', 'Password Salt', 'Name',
|
||||
'Photo URL', 'Google ID', 'Google Email', 'Google Display Name', 'Google Photo URL',
|
||||
'Facebook ID', 'Facebook Email', 'Facebook Display Name', 'Facebook Photo URL',
|
||||
'Twitter ID', 'Twitter Email', 'Twitter Display Name', 'Twitter Photo URL',
|
||||
'GitHub ID', 'GitHub Email', 'GitHub Display Name', 'GitHub Photo URL',
|
||||
'User Creation Time', 'Last Sign-In Time', 'Phone Number'
|
||||
]
|
||||
users_data = []
|
||||
|
||||
current_time = int(time.time() * 1000)
|
||||
|
||||
for user in batch_dto.users:
|
||||
user_data = {
|
||||
'UID': str(user.id),
|
||||
'Email': user.email,
|
||||
'Email Verified': False,
|
||||
'Password Hash': user.passwordHash,
|
||||
'Password Salt': user.passwordSalt,
|
||||
'Name': '',
|
||||
'Photo URL': '',
|
||||
'Google ID': '',
|
||||
'Google Email': '',
|
||||
'Google Display Name': '',
|
||||
'Google Photo URL': '',
|
||||
'Facebook ID': '',
|
||||
'Facebook Email': '',
|
||||
'Facebook Display Name': '',
|
||||
'Facebook Photo URL': '',
|
||||
'Twitter ID': '',
|
||||
'Twitter Email': '',
|
||||
'Twitter Display Name': '',
|
||||
'Twitter Photo URL': '',
|
||||
'GitHub ID': '',
|
||||
'GitHub Email': '',
|
||||
'GitHub Display Name': '',
|
||||
'GitHub Photo URL': '',
|
||||
'User Creation Time': current_time,
|
||||
'Last Sign-In Time': '',
|
||||
'Phone Number': ''
|
||||
}
|
||||
users_data.append(user_data)
|
||||
|
||||
df = pd.DataFrame(users_data, columns=columns)
|
||||
df.to_csv(path, index=False, header=False)
|
||||
|
||||
@staticmethod
|
||||
def _upload_users(directory: str, file_name: str):
|
||||
command = (
|
||||
f'firebase auth:import {file_name} '
|
||||
f'--hash-algo=SCRYPT '
|
||||
f'--hash-key={os.getenv("FIREBASE_SCRYPT_B64_SIGNER_KEY")} '
|
||||
f'--salt-separator={os.getenv("FIREBASE_SCRYPT_B64_SALT_SEPARATOR")} '
|
||||
f'--rounds={os.getenv("FIREBASE_SCRYPT_ROUNDS")} '
|
||||
f'--mem-cost={os.getenv("FIREBASE_SCRYPT_MEM_COST")} '
|
||||
f'--project={os.getenv("FIREBASE_PROJECT_ID")} '
|
||||
)
|
||||
|
||||
result = subprocess.run(command, shell=True, cwd=directory, capture_output=True, text=True)
|
||||
return result
|
||||
|
||||
def _init_users(self, batch_users: BatchUsersDTO):
|
||||
maker_id = batch_users.makerID
|
||||
for user in batch_users.users:
|
||||
self._insert_new_user(user)
|
||||
code = self._create_code(user, maker_id)
|
||||
|
||||
if user.type == "corporate":
|
||||
self._set_corporate_default_groups(user)
|
||||
|
||||
if user.corporate:
|
||||
self._assign_corporate_to_user(user, code)
|
||||
|
||||
if user.groupName and len(user.groupName.strip()) > 0:
|
||||
self._assign_user_to_group_by_name(user, maker_id)
|
||||
|
||||
def _insert_new_user(self, user: UserDTO):
|
||||
new_user = {
|
||||
**user.dict(exclude={
|
||||
'passport_id', 'groupName', 'expiryDate',
|
||||
'corporate', 'passwordHash', 'passwordSalt'
|
||||
}),
|
||||
'bio': "",
|
||||
'focus': "academic",
|
||||
'status': "active",
|
||||
'desiredLevels': self._DEFAULT_DESIRED_LEVELS,
|
||||
'profilePicture': "/defaultAvatar.png",
|
||||
'levels': self._DEFAULT_LEVELS,
|
||||
'isFirstLogin': False,
|
||||
'isVerified': True,
|
||||
'registrationDate': datetime.now(),
|
||||
'subscriptionExpirationDate': user.expiryDate
|
||||
}
|
||||
self._db.users.insert_one(new_user)
|
||||
|
||||
def _create_code(self, user: UserDTO, maker_id: str) -> str:
|
||||
code = shortuuid.ShortUUID().random(length=6)
|
||||
self._db.codes.insert_one({
|
||||
'id': code,
|
||||
'code': code,
|
||||
'creator': maker_id,
|
||||
'expiryDate': user.expiryDate,
|
||||
'type': user.type,
|
||||
'creationDate': datetime.now(),
|
||||
'userId': str(user.id),
|
||||
'email': user.email,
|
||||
'name': user.name,
|
||||
'passport_id': user.passport_id
|
||||
})
|
||||
return code
|
||||
|
||||
def _set_corporate_default_groups(self, user: UserDTO):
|
||||
user_id = str(user.id)
|
||||
default_groups = [
|
||||
{
|
||||
'admin': user_id,
|
||||
'id': str(uuid.uuid4()),
|
||||
'name': "Teachers",
|
||||
'participants': [],
|
||||
'disableEditing': True,
|
||||
},
|
||||
{
|
||||
'admin': user_id,
|
||||
'id': str(uuid.uuid4()),
|
||||
'name': "Students",
|
||||
'participants': [],
|
||||
'disableEditing': True,
|
||||
},
|
||||
{
|
||||
'admin': user_id,
|
||||
'id': str(uuid.uuid4()),
|
||||
'name': "Corporate",
|
||||
'participants': [],
|
||||
'disableEditing': True,
|
||||
}
|
||||
]
|
||||
for group in default_groups:
|
||||
self._db.groups.insert_one(group)
|
||||
|
||||
def _assign_corporate_to_user(self, user: UserDTO, code: str):
|
||||
user_id = str(user.id)
|
||||
corporate_user = self._db.users.find_one(
|
||||
{"email": user.corporate}
|
||||
)
|
||||
if corporate_user:
|
||||
self._db.codes.update_one(
|
||||
{"id": code},
|
||||
{"$set": {"creator": corporate_user.id}},
|
||||
upsert=True
|
||||
)
|
||||
group_type = "Students" if user.type == "student" else "Teachers"
|
||||
|
||||
group = self._db.groups.find_one(
|
||||
{
|
||||
"admin": corporate_user.id,
|
||||
"name": group_type
|
||||
}
|
||||
)
|
||||
|
||||
if group:
|
||||
participants = group['participants']
|
||||
if user_id not in participants:
|
||||
participants.append(user_id)
|
||||
self._db.groups.update_one(
|
||||
{"id": group.id},
|
||||
{"$set": {"participants": participants}}
|
||||
)
|
||||
|
||||
else:
|
||||
group = {
|
||||
'admin': corporate_user.id,
|
||||
'id': str(uuid.uuid4()),
|
||||
'name': group_type,
|
||||
'participants': [user_id],
|
||||
'disableEditing': True,
|
||||
}
|
||||
|
||||
self._db.groups.insert_one(group)
|
||||
|
||||
def _assign_user_to_group_by_name(self, user: UserDTO, maker_id: str):
|
||||
user_id = str(user.id)
|
||||
|
||||
group = self._db.groups.find_one(
|
||||
{
|
||||
"admin": maker_id,
|
||||
"name": user.group_name.strip()
|
||||
}
|
||||
)
|
||||
|
||||
if group:
|
||||
new_group = {
|
||||
'id': str(uuid.uuid4()),
|
||||
'admin': maker_id,
|
||||
'name': user.groupName.strip(),
|
||||
'participants': [user_id],
|
||||
'disableEditing': False,
|
||||
}
|
||||
self._db.groups.insert_one(new_group)
|
||||
else:
|
||||
participants = group.participants
|
||||
if user_id not in participants:
|
||||
participants.append(user_id)
|
||||
self._db.groups.update_one(
|
||||
{"id": group.id},
|
||||
{"$set": {"participants": participants}}
|
||||
)
|
||||
66
modules/gpt.py
Normal file
66
modules/gpt.py
Normal file
@@ -0,0 +1,66 @@
|
||||
import json
|
||||
from logging import getLogger
|
||||
|
||||
from typing import List, Optional, Callable, TypeVar
|
||||
|
||||
from openai.types.chat import ChatCompletionMessageParam
|
||||
from pydantic import BaseModel
|
||||
|
||||
T = TypeVar('T', bound=BaseModel)
|
||||
|
||||
|
||||
class GPT:
|
||||
|
||||
def __init__(self, openai_client):
|
||||
self._client = openai_client
|
||||
self._default_model = "gpt-4o-2024-08-06"
|
||||
self._logger = getLogger(__name__)
|
||||
|
||||
def prediction(
|
||||
self,
|
||||
messages: List[ChatCompletionMessageParam],
|
||||
map_to_model: Callable,
|
||||
json_scheme: str,
|
||||
*,
|
||||
model: Optional[str] = None,
|
||||
temperature: Optional[float] = None,
|
||||
max_retries: int = 3
|
||||
) -> List[T] | T | None:
|
||||
params = {
|
||||
"messages": messages,
|
||||
"response_format": {"type": "json_object"},
|
||||
"model": model if model else self._default_model
|
||||
}
|
||||
|
||||
if temperature:
|
||||
params["temperature"] = temperature
|
||||
|
||||
attempt = 0
|
||||
while attempt < max_retries:
|
||||
result = self._client.chat.completions.create(**params)
|
||||
result_content = result.choices[0].message.content
|
||||
try:
|
||||
result_json = json.loads(result_content)
|
||||
return map_to_model(result_json)
|
||||
except Exception as e:
|
||||
attempt += 1
|
||||
self._logger.info(f"GPT returned malformed response: {result_content}\n {str(e)}")
|
||||
params["messages"] = [
|
||||
{
|
||||
"role": "user",
|
||||
"content": (
|
||||
"Your previous response wasn't in the json format I've explicitly told you to output. "
|
||||
f"In your next response, you will fix it and return me just the json I've asked."
|
||||
)
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"content": (
|
||||
f"Previous response: {result_content}\n"
|
||||
f"JSON format: {json_scheme}"
|
||||
)
|
||||
}
|
||||
]
|
||||
if attempt >= max_retries:
|
||||
self._logger.error(f"Max retries exceeded!")
|
||||
return None
|
||||
5
modules/helper/__init__.py
Normal file
5
modules/helper/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
from .logger import LoggerHelper
|
||||
|
||||
__all__ = [
|
||||
"LoggerHelper"
|
||||
]
|
||||
97
modules/helper/file_helper.py
Normal file
97
modules/helper/file_helper.py
Normal file
@@ -0,0 +1,97 @@
|
||||
import base64
|
||||
import io
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import uuid
|
||||
from typing import Optional, Tuple
|
||||
|
||||
import numpy as np
|
||||
import pypandoc
|
||||
from PIL import Image
|
||||
|
||||
|
||||
class FileHelper:
|
||||
|
||||
# Supposedly pandoc covers a wide range of file extensions only tested with docx
|
||||
@staticmethod
|
||||
def convert_file_to_pdf(input_path: str, output_path: str):
|
||||
pypandoc.convert_file(input_path, 'pdf', outputfile=output_path, extra_args=[
|
||||
'-V', 'geometry:paperwidth=5.5in',
|
||||
'-V', 'geometry:paperheight=8.5in',
|
||||
'-V', 'geometry:margin=0.5in',
|
||||
'-V', 'pagestyle=empty'
|
||||
])
|
||||
|
||||
@staticmethod
|
||||
def convert_file_to_html(input_path: str, output_path: str):
|
||||
pypandoc.convert_file(input_path, 'html', outputfile=output_path)
|
||||
|
||||
@staticmethod
|
||||
def pdf_to_png(path_id: str):
|
||||
to_png = f"pdftoppm -png exercises.pdf page"
|
||||
result = subprocess.run(to_png, shell=True, cwd=f'./tmp/{path_id}', capture_output=True, text=True)
|
||||
if result.returncode != 0:
|
||||
raise Exception(
|
||||
f"Couldn't convert pdf to png. Failed to run command '{to_png}' -> ```cmd {result.stderr}```")
|
||||
|
||||
@staticmethod
|
||||
def is_page_blank(image_bytes: bytes, image_threshold=10) -> bool:
|
||||
with Image.open(io.BytesIO(image_bytes)) as img:
|
||||
img_gray = img.convert('L')
|
||||
img_array = np.array(img_gray)
|
||||
non_white_pixels = np.sum(img_array < 255)
|
||||
|
||||
return non_white_pixels <= image_threshold
|
||||
|
||||
@classmethod
|
||||
def _encode_image(cls, image_path: str, image_threshold=10) -> Optional[str]:
|
||||
with open(image_path, "rb") as image_file:
|
||||
image_bytes = image_file.read()
|
||||
|
||||
if cls.is_page_blank(image_bytes, image_threshold):
|
||||
return None
|
||||
|
||||
return base64.b64encode(image_bytes).decode('utf-8')
|
||||
|
||||
@classmethod
|
||||
def b64_pngs(cls, path_id: str, files: list[str]):
|
||||
png_messages = []
|
||||
for filename in files:
|
||||
b64_string = cls._encode_image(os.path.join(f'./tmp/{path_id}', filename))
|
||||
if b64_string:
|
||||
png_messages.append({
|
||||
"type": "image_url",
|
||||
"image_url": {
|
||||
"url": f"data:image/png;base64,{b64_string}"
|
||||
}
|
||||
})
|
||||
return png_messages
|
||||
|
||||
@staticmethod
|
||||
def remove_directory(path):
|
||||
try:
|
||||
if os.path.exists(path):
|
||||
if os.path.isdir(path):
|
||||
shutil.rmtree(path)
|
||||
except Exception as e:
|
||||
print(f"An error occurred while trying to remove {path}: {str(e)}")
|
||||
|
||||
@staticmethod
|
||||
def remove_file(file_path):
|
||||
try:
|
||||
if os.path.exists(file_path):
|
||||
if os.path.isfile(file_path):
|
||||
os.remove(file_path)
|
||||
except Exception as e:
|
||||
print(f"An error occurred while trying to remove the file {file_path}: {str(e)}")
|
||||
|
||||
@staticmethod
|
||||
def save_upload(file) -> Tuple[str, str]:
|
||||
ext = file.filename.split('.')[-1]
|
||||
path_id = str(uuid.uuid4())
|
||||
os.makedirs(f'./tmp/{path_id}', exist_ok=True)
|
||||
|
||||
tmp_filename = f'./tmp/{path_id}/uploaded.{ext}'
|
||||
file.save(tmp_filename)
|
||||
return ext, path_id
|
||||
23
modules/helper/logger.py
Normal file
23
modules/helper/logger.py
Normal file
@@ -0,0 +1,23 @@
|
||||
import logging
|
||||
from functools import wraps
|
||||
|
||||
|
||||
class LoggerHelper:
|
||||
|
||||
@staticmethod
|
||||
def suppress_loggers():
|
||||
def decorator(f):
|
||||
@wraps(f)
|
||||
def wrapped(*args, **kwargs):
|
||||
root_logger = logging.getLogger()
|
||||
original_level = root_logger.level
|
||||
|
||||
root_logger.setLevel(logging.ERROR)
|
||||
|
||||
try:
|
||||
return f(*args, **kwargs)
|
||||
finally:
|
||||
root_logger.setLevel(original_level)
|
||||
|
||||
return wrapped
|
||||
return decorator
|
||||
7
modules/training_content/__init__.py
Normal file
7
modules/training_content/__init__.py
Normal file
@@ -0,0 +1,7 @@
|
||||
from .kb import TrainingContentKnowledgeBase
|
||||
from .service import TrainingContentService
|
||||
|
||||
__all__ = [
|
||||
"TrainingContentService",
|
||||
"TrainingContentKnowledgeBase"
|
||||
]
|
||||
29
modules/training_content/dtos.py
Normal file
29
modules/training_content/dtos.py
Normal file
@@ -0,0 +1,29 @@
|
||||
from pydantic import BaseModel
|
||||
from typing import List
|
||||
|
||||
|
||||
class QueryDTO(BaseModel):
|
||||
category: str
|
||||
text: str
|
||||
|
||||
|
||||
class DetailsDTO(BaseModel):
|
||||
exam_id: str
|
||||
date: int
|
||||
performance_comment: str
|
||||
detailed_summary: str
|
||||
|
||||
|
||||
class WeakAreaDTO(BaseModel):
|
||||
area: str
|
||||
comment: str
|
||||
|
||||
|
||||
class TrainingContentDTO(BaseModel):
|
||||
details: List[DetailsDTO]
|
||||
weak_areas: List[WeakAreaDTO]
|
||||
queries: List[QueryDTO]
|
||||
|
||||
|
||||
class TipsDTO(BaseModel):
|
||||
tip_ids: List[str]
|
||||
85
modules/training_content/kb.py
Normal file
85
modules/training_content/kb.py
Normal file
@@ -0,0 +1,85 @@
|
||||
import json
|
||||
import os
|
||||
from logging import getLogger
|
||||
from typing import Dict, List
|
||||
|
||||
import faiss
|
||||
import pickle
|
||||
|
||||
|
||||
class TrainingContentKnowledgeBase:
|
||||
|
||||
def __init__(self, embeddings, path: str = 'pathways_2_rw_with_ids.json'):
|
||||
self._embedding_model = embeddings
|
||||
self._tips = None # self._read_json(path)
|
||||
self._category_metadata = None
|
||||
self._indices = None
|
||||
self._logger = getLogger(__name__)
|
||||
|
||||
@staticmethod
|
||||
def _read_json(path: str) -> Dict[str, any]:
|
||||
with open(path, 'r', encoding="utf-8") as json_file:
|
||||
return json.loads(json_file.read())
|
||||
|
||||
def print_category_count(self):
|
||||
category_tips = {}
|
||||
for unit in self._tips['units']:
|
||||
for page in unit['pages']:
|
||||
for tip in page['tips']:
|
||||
category = tip['category'].lower().replace(" ", "_")
|
||||
if category not in category_tips:
|
||||
category_tips[category] = 0
|
||||
else:
|
||||
category_tips[category] = category_tips[category] + 1
|
||||
print(category_tips)
|
||||
|
||||
def create_embeddings_and_save_them(self) -> None:
|
||||
category_embeddings = {}
|
||||
category_metadata = {}
|
||||
|
||||
for unit in self._tips['units']:
|
||||
for page in unit['pages']:
|
||||
for tip in page['tips']:
|
||||
category = tip['category'].lower().replace(" ", "_")
|
||||
if category not in category_embeddings:
|
||||
category_embeddings[category] = []
|
||||
category_metadata[category] = []
|
||||
|
||||
category_embeddings[category].append(tip['embedding'])
|
||||
category_metadata[category].append({"id": tip['id'], "text": tip['text']})
|
||||
|
||||
category_indices = {}
|
||||
for category, embeddings in category_embeddings.items():
|
||||
embeddings_array = self._embedding_model.encode(embeddings)
|
||||
index = faiss.IndexFlatL2(embeddings_array.shape[1])
|
||||
index.add(embeddings_array)
|
||||
category_indices[category] = index
|
||||
|
||||
faiss.write_index(index, f"./faiss/{category}_tips_index.faiss")
|
||||
|
||||
with open("./faiss/tips_metadata.pkl", "wb") as f:
|
||||
pickle.dump(category_metadata, f)
|
||||
|
||||
def load_indices_and_metadata(
|
||||
self,
|
||||
directory: str = './faiss',
|
||||
suffix: str = '_tips_index.faiss',
|
||||
metadata_path: str = './faiss/tips_metadata.pkl'
|
||||
):
|
||||
files = os.listdir(directory)
|
||||
self._indices = {}
|
||||
for file in files:
|
||||
if file.endswith(suffix):
|
||||
self._indices[file[:-len(suffix)]] = faiss.read_index(f'{directory}/{file}')
|
||||
self._logger.info(f'Loaded embeddings for {file[:-len(suffix)]} category.')
|
||||
|
||||
with open(metadata_path, 'rb') as f:
|
||||
self._category_metadata = pickle.load(f)
|
||||
self._logger.info("Loaded tips metadata")
|
||||
|
||||
def query_knowledge_base(self, query: str, category: str, top_k: int = 5) -> List[Dict[str, str]]:
|
||||
query_embedding = self._embedding_model.encode([query])
|
||||
index = self._indices[category]
|
||||
D, I = index.search(query_embedding, top_k)
|
||||
results = [self._category_metadata[category][i] for i in I[0]]
|
||||
return results
|
||||
409
modules/training_content/service.py
Normal file
409
modules/training_content/service.py
Normal file
@@ -0,0 +1,409 @@
|
||||
import json
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
from logging import getLogger
|
||||
|
||||
from typing import Dict, List
|
||||
|
||||
from pymongo.database import Database
|
||||
|
||||
from modules.training_content.dtos import TrainingContentDTO, WeakAreaDTO, QueryDTO, DetailsDTO, TipsDTO
|
||||
|
||||
|
||||
class TrainingContentService:
|
||||
|
||||
TOOLS = [
|
||||
'critical_thinking',
|
||||
'language_for_writing',
|
||||
'reading_skills',
|
||||
'strategy',
|
||||
'words',
|
||||
'writing_skills'
|
||||
]
|
||||
# strategy word_link ct_focus reading_skill word_partners writing_skill language_for_writing
|
||||
|
||||
def __init__(self, kb, openai, mongo: Database):
|
||||
self._training_content_module = kb
|
||||
self._db: Database = mongo
|
||||
self._logger = getLogger(__name__)
|
||||
self._llm = openai
|
||||
|
||||
def get_tips(self, training_content):
|
||||
user, stats = training_content["userID"], training_content["stats"]
|
||||
exam_data, exam_map = self._sort_out_solutions(stats)
|
||||
training_content = self._get_exam_details_and_tips(exam_data)
|
||||
tips = self._query_kb(training_content.queries)
|
||||
usefull_tips = self._get_usefull_tips(exam_data, tips)
|
||||
exam_map = self._merge_exam_map_with_details(exam_map, training_content.details)
|
||||
|
||||
weak_areas = {"weak_areas": []}
|
||||
for area in training_content.weak_areas:
|
||||
weak_areas["weak_areas"].append(area.dict())
|
||||
|
||||
new_id = uuid.uuid4()
|
||||
training_doc = {
|
||||
'id': new_id,
|
||||
'created_at': int(datetime.now().timestamp() * 1000),
|
||||
**exam_map,
|
||||
**usefull_tips.dict(),
|
||||
**weak_areas,
|
||||
"user": user
|
||||
}
|
||||
self._db.training.insert_one(training_doc)
|
||||
return {
|
||||
"id": new_id
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _merge_exam_map_with_details(exam_map: Dict[str, any], details: List[DetailsDTO]):
|
||||
new_exam_map = {"exams": []}
|
||||
for detail in details:
|
||||
new_exam_map["exams"].append({
|
||||
"id": detail.exam_id,
|
||||
"date": detail.date,
|
||||
"performance_comment": detail.performance_comment,
|
||||
"detailed_summary": detail.detailed_summary,
|
||||
**exam_map[detail.exam_id]
|
||||
})
|
||||
return new_exam_map
|
||||
|
||||
def _query_kb(self, queries: List[QueryDTO]):
|
||||
map_categories = {
|
||||
"critical_thinking": "ct_focus",
|
||||
"language_for_writing": "language_for_writing",
|
||||
"reading_skills": "reading_skill",
|
||||
"strategy": "strategy",
|
||||
"writing_skills": "writing_skill"
|
||||
}
|
||||
|
||||
tips = {"tips": []}
|
||||
for query in queries:
|
||||
if query.category == "words":
|
||||
tips["tips"].extend(
|
||||
self._training_content_module.query_knowledge_base(query.text, "word_link")
|
||||
)
|
||||
tips["tips"].extend(
|
||||
self._training_content_module.query_knowledge_base(query.text, "word_partners")
|
||||
)
|
||||
else:
|
||||
if query.category in map_categories:
|
||||
tips["tips"].extend(
|
||||
self._training_content_module.query_knowledge_base(query.text, map_categories[query.category])
|
||||
)
|
||||
else:
|
||||
self._logger.info(f"GTP tried to query knowledge base for {query.category} and it doesn't exist.")
|
||||
return tips
|
||||
|
||||
def _get_exam_details_and_tips(self, exam_data: Dict[str, any]) -> TrainingContentDTO:
|
||||
json_schema = (
|
||||
'{ "details": [{"exam_id": "", "date": 0, "performance_comment": "", "detailed_summary": ""}],'
|
||||
' "weak_areas": [{"area": "", "comment": ""}], "queries": [{"text": "", "category": ""}] }'
|
||||
)
|
||||
messages = [
|
||||
{
|
||||
"role": "user",
|
||||
"content": (
|
||||
f"I'm going to provide you with exam data, you will take the exam data and fill this json "
|
||||
f'schema : {json_schema}. "performance_comment" is a short sentence that describes the '
|
||||
'students\'s performance and main mistakes in a single exam, "detailed_summary" is a detailed '
|
||||
'summary of the student\'s performance, "weak_areas" are identified areas'
|
||||
' across all exams which need to be improved upon, for example, area "Grammar and Syntax" comment "Issues'
|
||||
' with sentence structure and punctuation.", the "queries" field is where you will write queries '
|
||||
'for tips that will be displayed to the student, the category attribute is a collection of '
|
||||
'embeddings and the text will be the text used to query the knowledge base. The categories are '
|
||||
f'the following [{", ".join(self.TOOLS)}]. The exam data will be a json where the key of the field '
|
||||
'"exams" is the exam id, an exam can be composed of multiple modules or single modules. The student'
|
||||
' will see your response so refrain from using phrasing like "The student" did x, y and z. If the '
|
||||
'field "answer" in a question is an empty array "[]", then the student didn\'t answer any question '
|
||||
'and you must address that in your response. Also questions aren\'t modules, the only modules are: '
|
||||
'level, speaking, writing, reading and listening. The details array needs to be tailored to the '
|
||||
'exam attempt, even if you receive the same exam you must treat as different exams by their id.'
|
||||
'Don\'t make references to an exam by it\'s id, the GUI will handle that so the student knows '
|
||||
'which is the exam your comments and summary are referencing too. Even if the student hasn\'t '
|
||||
'submitted no answers for an exam, you must still fill the details structure addressing that fact.'
|
||||
)
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"content": f'Exam Data: {str(exam_data)}'
|
||||
}
|
||||
]
|
||||
return self._llm.prediction(messages, self._map_gpt_response, json_schema)
|
||||
|
||||
def _get_usefull_tips(self, exam_data: Dict[str, any], tips: Dict[str, any]) -> TipsDTO:
|
||||
json_schema = (
|
||||
'{ "tip_ids": [] }'
|
||||
)
|
||||
messages = [
|
||||
{
|
||||
"role": "user",
|
||||
"content": (
|
||||
f"I'm going to provide you with tips and I want you to return to me the tips that "
|
||||
f"can be usefull for the student that made the exam that I'm going to send you, return "
|
||||
f"me the tip ids in this json format {json_schema}."
|
||||
)
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"content": f'Exam Data: {str(exam_data)}'
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"content": f'Tips: {str(tips)}'
|
||||
}
|
||||
]
|
||||
return self._llm.prediction(messages, lambda response: TipsDTO(**response), json_schema)
|
||||
|
||||
@staticmethod
|
||||
def _map_gpt_response(response: Dict[str, any]) -> TrainingContentDTO:
|
||||
parsed_response = {
|
||||
"details": [DetailsDTO(**detail) for detail in response["details"]],
|
||||
"weak_areas": [WeakAreaDTO(**area) for area in response["weak_areas"]],
|
||||
"queries": [QueryDTO(**query) for query in response["queries"]]
|
||||
}
|
||||
return TrainingContentDTO(**parsed_response)
|
||||
|
||||
def _sort_out_solutions(self, stats):
|
||||
grouped_stats = {}
|
||||
for stat in stats:
|
||||
session_key = f'{str(stat["date"])}-{stat["user"]}'
|
||||
module = stat["module"]
|
||||
exam_id = stat["exam"]
|
||||
|
||||
if session_key not in grouped_stats:
|
||||
grouped_stats[session_key] = {}
|
||||
if module not in grouped_stats[session_key]:
|
||||
grouped_stats[session_key][module] = {
|
||||
"stats": [],
|
||||
"exam_id": exam_id
|
||||
}
|
||||
grouped_stats[session_key][module]["stats"].append(stat)
|
||||
|
||||
exercises = {}
|
||||
exam_map = {}
|
||||
for session_key, modules in grouped_stats.items():
|
||||
exercises[session_key] = {}
|
||||
for module, module_stats in modules.items():
|
||||
exercises[session_key][module] = {}
|
||||
|
||||
exam_id = module_stats["exam_id"]
|
||||
if exam_id not in exercises[session_key][module]:
|
||||
exercises[session_key][module][exam_id] = {"date": None, "exercises": []}
|
||||
|
||||
exam_total_questions = 0
|
||||
exam_total_correct = 0
|
||||
|
||||
for stat in module_stats["stats"]:
|
||||
exam_total_questions += stat["score"]["total"]
|
||||
exam_total_correct += stat["score"]["correct"]
|
||||
exercises[session_key][module][exam_id]["date"] = stat["date"]
|
||||
|
||||
if session_key not in exam_map:
|
||||
exam_map[session_key] = {"stat_ids": [], "score": 0}
|
||||
exam_map[session_key]["stat_ids"].append(stat["id"])
|
||||
|
||||
exam = self._get_doc_by_id(module, exam_id)
|
||||
if module == "listening":
|
||||
exercises[session_key][module][exam_id]["exercises"].extend(
|
||||
self._get_listening_solutions(stat, exam))
|
||||
elif module == "reading":
|
||||
exercises[session_key][module][exam_id]["exercises"].extend(
|
||||
self._get_reading_solutions(stat, exam))
|
||||
elif module == "writing":
|
||||
exercises[session_key][module][exam_id]["exercises"].extend(
|
||||
self._get_writing_prompts_and_answers(stat, exam)
|
||||
)
|
||||
elif module == "speaking":
|
||||
exercises[session_key][module][exam_id]["exercises"].extend(
|
||||
self._get_speaking_solutions(stat, exam)
|
||||
)
|
||||
elif module == "level":
|
||||
exercises[session_key][module][exam_id]["exercises"].extend(
|
||||
self._get_level_solutions(stat, exam)
|
||||
)
|
||||
|
||||
exam_map[session_key]["score"] = round((exam_total_correct / exam_total_questions) * 100)
|
||||
exam_map[session_key]["module"] = module
|
||||
with open('exam_result.json', 'w') as file:
|
||||
json.dump({"exams": exercises}, file, indent=4)
|
||||
|
||||
return {"exams": exercises}, exam_map
|
||||
|
||||
def _get_writing_prompts_and_answers(self, stat, exam):
|
||||
result = []
|
||||
try:
|
||||
exercises = []
|
||||
for solution in stat['solutions']:
|
||||
answer = solution['solution']
|
||||
exercise_id = solution['id']
|
||||
exercises.append({
|
||||
"exercise_id": exercise_id,
|
||||
"answer": answer
|
||||
})
|
||||
for exercise in exercises:
|
||||
for exam_exercise in exam["exercises"]:
|
||||
if exam_exercise["id"] == exercise["exercise_id"]:
|
||||
result.append({
|
||||
"exercise": exam_exercise["prompt"],
|
||||
"answer": exercise["answer"]
|
||||
})
|
||||
|
||||
except KeyError as e:
|
||||
self._logger.warning(f"Malformed stat object: {str(e)}")
|
||||
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def _get_mc_question(exercise, stat):
|
||||
shuffle_maps = stat.get("shuffleMaps", [])
|
||||
answer = stat["solutions"] if len(shuffle_maps) == 0 else []
|
||||
if len(shuffle_maps) != 0:
|
||||
for solution in stat["solutions"]:
|
||||
shuffle_map = [
|
||||
item["map"] for item in shuffle_maps
|
||||
if item["questionID"] == solution["question"]
|
||||
]
|
||||
answer.append({
|
||||
"question": solution["question"],
|
||||
"option": shuffle_map[solution["option"]]
|
||||
})
|
||||
return {
|
||||
"question": exercise["prompt"],
|
||||
"exercise": exercise["questions"],
|
||||
"answer": stat["solutions"]
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _swap_key_name(d, original_key, new_key):
|
||||
d[new_key] = d.pop(original_key)
|
||||
return d
|
||||
|
||||
def _get_level_solutions(self, stat, exam):
|
||||
result = []
|
||||
try:
|
||||
for part in exam["parts"]:
|
||||
for exercise in part["exercises"]:
|
||||
if exercise["id"] == stat["exercise"]:
|
||||
if stat["type"] == "fillBlanks":
|
||||
result.append({
|
||||
"prompt": exercise["prompt"],
|
||||
"template": exercise["text"],
|
||||
"words": exercise["words"],
|
||||
"solutions": exercise["solutions"],
|
||||
"answer": [
|
||||
self._swap_key_name(item, 'solution', 'option')
|
||||
for item in stat["solutions"]
|
||||
]
|
||||
})
|
||||
elif stat["type"] == "multipleChoice":
|
||||
result.append(self._get_mc_question(exercise, stat))
|
||||
except KeyError as e:
|
||||
self._logger.warning(f"Malformed stat object: {str(e)}")
|
||||
return result
|
||||
|
||||
def _get_listening_solutions(self, stat, exam):
|
||||
result = []
|
||||
try:
|
||||
for part in exam["parts"]:
|
||||
for exercise in part["exercises"]:
|
||||
if exercise["id"] == stat["exercise"]:
|
||||
if stat["type"] == "writeBlanks":
|
||||
result.append({
|
||||
"question": exercise["prompt"],
|
||||
"template": exercise["text"],
|
||||
"solution": exercise["solutions"],
|
||||
"answer": stat["solutions"]
|
||||
})
|
||||
elif stat["type"] == "fillBlanks":
|
||||
result.append({
|
||||
"question": exercise["prompt"],
|
||||
"template": exercise["text"],
|
||||
"words": exercise["words"],
|
||||
"solutions": exercise["solutions"],
|
||||
"answer": stat["solutions"]
|
||||
})
|
||||
elif stat["type"] == "multipleChoice":
|
||||
result.append(self._get_mc_question(exercise, stat))
|
||||
|
||||
except KeyError as e:
|
||||
self._logger.warning(f"Malformed stat object: {str(e)}")
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def _find_shuffle_map(shuffle_maps, question_id):
|
||||
return next((item["map"] for item in shuffle_maps if item["questionID"] == question_id), None)
|
||||
|
||||
def _get_speaking_solutions(self, stat, exam):
|
||||
result = {}
|
||||
try:
|
||||
result = {
|
||||
"comments": {
|
||||
key: value['comment'] for key, value in stat['solutions'][0]['evaluation']['task_response'].items()}
|
||||
,
|
||||
"exercises": {}
|
||||
}
|
||||
|
||||
for exercise in exam["exercises"]:
|
||||
if exercise["id"] == stat["exercise"]:
|
||||
if stat["type"] == "interactiveSpeaking":
|
||||
for i in range(len(exercise["prompts"])):
|
||||
result["exercises"][f"exercise_{i+1}"] = {
|
||||
"question": exercise["prompts"][i]["text"]
|
||||
}
|
||||
for i in range(len(exercise["prompts"])):
|
||||
answer = stat['solutions'][0]["evaluation"].get(f'transcript_{i+1}', '')
|
||||
result["exercises"][f"exercise_{i+1}"]["answer"] = answer
|
||||
elif stat["type"] == "speaking":
|
||||
result["exercises"]["exercise_1"] = {
|
||||
"question": exercise["text"],
|
||||
"answer": stat['solutions'][0]["evaluation"].get(f'transcript', '')
|
||||
}
|
||||
except KeyError as e:
|
||||
self._logger.warning(f"Malformed stat object: {str(e)}")
|
||||
return [result]
|
||||
|
||||
def _get_reading_solutions(self, stat, exam):
|
||||
result = []
|
||||
try:
|
||||
for part in exam["parts"]:
|
||||
text = part["text"]
|
||||
for exercise in part["exercises"]:
|
||||
if exercise["id"] == stat["exercise"]:
|
||||
if stat["type"] == "fillBlanks":
|
||||
result.append({
|
||||
"text": text,
|
||||
"question": exercise["prompt"],
|
||||
"template": exercise["text"],
|
||||
"words": exercise["words"],
|
||||
"solutions": exercise["solutions"],
|
||||
"answer": stat["solutions"]
|
||||
})
|
||||
elif stat["type"] == "writeBlanks":
|
||||
result.append({
|
||||
"text": text,
|
||||
"question": exercise["prompt"],
|
||||
"template": exercise["text"],
|
||||
"solutions": exercise["solutions"],
|
||||
"answer": stat["solutions"]
|
||||
})
|
||||
elif stat["type"] == "trueFalse":
|
||||
result.append({
|
||||
"text": text,
|
||||
"questions": exercise["questions"],
|
||||
"answer": stat["solutions"]
|
||||
})
|
||||
elif stat["type"] == "matchSentences":
|
||||
result.append({
|
||||
"text": text,
|
||||
"question": exercise["prompt"],
|
||||
"sentences": exercise["sentences"],
|
||||
"options": exercise["options"],
|
||||
"answer": stat["solutions"]
|
||||
})
|
||||
except KeyError as e:
|
||||
self._logger.warning(f"Malformed stat object: {str(e)}")
|
||||
return result
|
||||
|
||||
def _get_doc_by_id(self, collection: str, doc_id: str):
|
||||
doc = self._db[collection].find_one({"id": doc_id})
|
||||
return doc
|
||||
5
modules/upload_level/__init__.py
Normal file
5
modules/upload_level/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
from .service import UploadLevelService
|
||||
|
||||
__all__ = [
|
||||
"UploadLevelService"
|
||||
]
|
||||
57
modules/upload_level/exam_dtos.py
Normal file
57
modules/upload_level/exam_dtos.py
Normal file
@@ -0,0 +1,57 @@
|
||||
from pydantic import BaseModel, Field
|
||||
from typing import List, Dict, Union, Optional, Any
|
||||
from uuid import uuid4, UUID
|
||||
|
||||
|
||||
class Option(BaseModel):
|
||||
id: str
|
||||
text: str
|
||||
|
||||
|
||||
class MultipleChoiceQuestion(BaseModel):
|
||||
id: str
|
||||
prompt: str
|
||||
variant: str = "text"
|
||||
solution: str
|
||||
options: List[Option]
|
||||
|
||||
|
||||
class MultipleChoiceExercise(BaseModel):
|
||||
id: UUID = Field(default_factory=uuid4)
|
||||
type: str = "multipleChoice"
|
||||
prompt: str = "Select the appropriate option."
|
||||
questions: List[MultipleChoiceQuestion]
|
||||
userSolutions: List = Field(default_factory=list)
|
||||
|
||||
|
||||
class FillBlanksWord(BaseModel):
|
||||
id: str
|
||||
options: Dict[str, str]
|
||||
|
||||
|
||||
class FillBlanksSolution(BaseModel):
|
||||
id: str
|
||||
solution: str
|
||||
|
||||
|
||||
class FillBlanksExercise(BaseModel):
|
||||
id: UUID = Field(default_factory=uuid4)
|
||||
type: str = "fillBlanks"
|
||||
variant: str = "mc"
|
||||
prompt: str = "Click a blank to select the appropriate word for it."
|
||||
text: str
|
||||
solutions: List[FillBlanksSolution]
|
||||
words: List[FillBlanksWord]
|
||||
userSolutions: List = Field(default_factory=list)
|
||||
|
||||
|
||||
Exercise = Union[MultipleChoiceExercise, FillBlanksExercise]
|
||||
|
||||
|
||||
class Part(BaseModel):
|
||||
exercises: List[Exercise]
|
||||
context: Optional[str] = Field(default=None)
|
||||
|
||||
|
||||
class Exam(BaseModel):
|
||||
parts: List[Part]
|
||||
66
modules/upload_level/mapper.py
Normal file
66
modules/upload_level/mapper.py
Normal file
@@ -0,0 +1,66 @@
|
||||
from typing import Dict, Any
|
||||
|
||||
from pydantic import ValidationError
|
||||
|
||||
from modules.upload_level.exam_dtos import (
|
||||
MultipleChoiceExercise,
|
||||
FillBlanksExercise,
|
||||
Part, Exam
|
||||
)
|
||||
from modules.upload_level.sheet_dtos import Sheet, Option, MultipleChoiceQuestion, FillBlanksWord
|
||||
|
||||
|
||||
class ExamMapper:
|
||||
|
||||
@staticmethod
|
||||
def map_to_exam_model(response: Dict[str, Any]) -> Exam:
|
||||
parts = []
|
||||
for part in response['parts']:
|
||||
part_exercises = part['exercises']
|
||||
context = part.get('context', None)
|
||||
|
||||
exercises = []
|
||||
for exercise in part_exercises:
|
||||
exercise_type = exercise['type']
|
||||
if exercise_type == 'multipleChoice':
|
||||
exercise_model = MultipleChoiceExercise(**exercise)
|
||||
elif exercise_type == 'fillBlanks':
|
||||
exercise_model = FillBlanksExercise(**exercise)
|
||||
else:
|
||||
raise ValidationError(f"Unknown exercise type: {exercise_type}")
|
||||
|
||||
exercises.append(exercise_model)
|
||||
|
||||
part_kwargs = {"exercises": exercises}
|
||||
if context is not None:
|
||||
part_kwargs["context"] = context
|
||||
|
||||
part_model = Part(**part_kwargs)
|
||||
parts.append(part_model)
|
||||
|
||||
return Exam(parts=parts)
|
||||
|
||||
@staticmethod
|
||||
def map_to_sheet(response: Dict[str, Any]) -> Sheet:
|
||||
components = []
|
||||
|
||||
for item in response["components"]:
|
||||
component_type = item["type"]
|
||||
|
||||
if component_type == "multipleChoice":
|
||||
options = [Option(id=opt["id"], text=opt["text"]) for opt in item["options"]]
|
||||
components.append(MultipleChoiceQuestion(
|
||||
id=item["id"],
|
||||
prompt=item["prompt"],
|
||||
variant=item.get("variant", "text"),
|
||||
options=options
|
||||
))
|
||||
elif component_type == "fillBlanks":
|
||||
components.append(FillBlanksWord(
|
||||
id=item["id"],
|
||||
options=item["options"]
|
||||
))
|
||||
else:
|
||||
components.append(item)
|
||||
|
||||
return Sheet(components=components)
|
||||
385
modules/upload_level/service.py
Normal file
385
modules/upload_level/service.py
Normal file
@@ -0,0 +1,385 @@
|
||||
import json
|
||||
import os
|
||||
import uuid
|
||||
from logging import getLogger
|
||||
|
||||
from typing import Dict, Any, Tuple, Callable
|
||||
|
||||
import pdfplumber
|
||||
|
||||
from modules import GPT
|
||||
from modules.helper.file_helper import FileHelper
|
||||
from modules.helper import LoggerHelper
|
||||
from modules.upload_level.exam_dtos import Exam
|
||||
from modules.upload_level.mapper import ExamMapper
|
||||
from modules.upload_level.sheet_dtos import Sheet
|
||||
|
||||
|
||||
class UploadLevelService:
|
||||
def __init__(self, openai: GPT):
|
||||
self._logger = getLogger(__name__)
|
||||
self._llm = openai
|
||||
|
||||
def generate_level_from_file(self, file) -> Dict[str, Any] | None:
|
||||
ext, path_id = FileHelper.save_upload(file)
|
||||
FileHelper.convert_file_to_pdf(
|
||||
f'./tmp/{path_id}/uploaded.{ext}', f'./tmp/{path_id}/exercises.pdf'
|
||||
)
|
||||
file_has_images = self._check_pdf_for_images(f'./tmp/{path_id}/exercises.pdf')
|
||||
|
||||
if not file_has_images:
|
||||
FileHelper.convert_file_to_html(f'./tmp/{path_id}/uploaded.{ext}', f'./tmp/{path_id}/exercises.html')
|
||||
|
||||
completion: Callable[[str], Exam] = self._png_completion if file_has_images else self._html_completion
|
||||
response = completion(path_id)
|
||||
|
||||
FileHelper.remove_directory(f'./tmp/{path_id}')
|
||||
|
||||
if response:
|
||||
return self.fix_ids(response.dict(exclude_none=True))
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
@LoggerHelper.suppress_loggers()
|
||||
def _check_pdf_for_images(pdf_path: str) -> bool:
|
||||
with pdfplumber.open(pdf_path) as pdf:
|
||||
for page in pdf.pages:
|
||||
if page.images:
|
||||
return True
|
||||
return False
|
||||
|
||||
def _level_json_schema(self):
|
||||
return {
|
||||
"parts": [
|
||||
{
|
||||
"context": "<this attribute is optional you may exclude it if not required>",
|
||||
"exercises": [
|
||||
self._multiple_choice_html(),
|
||||
self._passage_blank_space_html()
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
def _html_completion(self, path_id: str) -> Exam:
|
||||
with open(f'./tmp/{path_id}/exercises.html', 'r', encoding='utf-8') as f:
|
||||
html = f.read()
|
||||
|
||||
return self._llm.prediction(
|
||||
[self._gpt_instructions_html(),
|
||||
{
|
||||
"role": "user",
|
||||
"content": html
|
||||
}
|
||||
],
|
||||
ExamMapper.map_to_exam_model,
|
||||
str(self._level_json_schema())
|
||||
)
|
||||
|
||||
def _gpt_instructions_html(self):
|
||||
return {
|
||||
"role": "system",
|
||||
"content": (
|
||||
'You are GPT Scraper and your job is to clean dirty html into clean usable JSON formatted data.'
|
||||
'Your current task is to scrape html english questions sheets.\n\n'
|
||||
|
||||
'In the question sheet you will only see 4 types of question:\n'
|
||||
'- blank space multiple choice\n'
|
||||
'- underline multiple choice\n'
|
||||
'- reading passage blank space multiple choice\n'
|
||||
'- reading passage multiple choice\n\n'
|
||||
|
||||
'For the first two types of questions the template is the same but the question prompts differ, '
|
||||
'whilst in the blank space multiple choice you must include in the prompt the blank spaces with '
|
||||
'multiple "_", in the underline you must include in the prompt the <u></u> to '
|
||||
'indicate the underline and the options a, b, c, d must be the ordered underlines in the prompt.\n\n'
|
||||
|
||||
'For the reading passage exercise you must handle the formatting of the passages. If it is a '
|
||||
'reading passage with blank spaces you will see blanks represented with (question id) followed by a '
|
||||
'line and your job is to replace the brackets with the question id and line with "{{question id}}" '
|
||||
'with 2 newlines between paragraphs. For the reading passages without blanks you must remove '
|
||||
'any numbers that may be there to specify paragraph numbers or line numbers, and place 2 newlines '
|
||||
'between paragraphs.\n\n'
|
||||
|
||||
'IMPORTANT: Note that for the reading passages, the html might not reflect the actual paragraph '
|
||||
'structure, don\'t format the reading passages paragraphs only by the <p></p> tags, try to figure '
|
||||
'out the best paragraph separation possible.'
|
||||
|
||||
'You will place all the information in a single JSON: {"parts": [{"exercises": [{...}], "context": ""}]}\n '
|
||||
'Where {...} are the exercises templates for each part of a question sheet and the optional field '
|
||||
'context.'
|
||||
|
||||
'IMPORTANT: The question sheet may be divided by sections but you need to only consider the parts, '
|
||||
'so that you can group the exercises by the parts that are in the html, this is crucial since only '
|
||||
'reading passage multiple choice require context and if the context is included in parts where it '
|
||||
'is not required the UI will be messed up. Some make sure to correctly group the exercises by parts.\n'
|
||||
|
||||
'The templates for the exercises are the following:\n'
|
||||
'- blank space multiple choice, underline multiple choice and reading passage multiple choice: '
|
||||
f'{self._multiple_choice_html()}\n'
|
||||
f'- reading passage blank space multiple choice: {self._passage_blank_space_html()}\n'
|
||||
|
||||
'IMPORTANT: For the reading passage multiple choice the context field must be set with the reading '
|
||||
'passages without paragraphs or line numbers, with 2 newlines between paragraphs, for the other '
|
||||
'exercises exclude the context field.'
|
||||
)
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _multiple_choice_html():
|
||||
return {
|
||||
"type": "multipleChoice",
|
||||
"prompt": "Select the appropriate option.",
|
||||
"questions": [
|
||||
{
|
||||
"id": "<the question id>",
|
||||
"prompt": "<the question>",
|
||||
"solution": "<the option id solution>",
|
||||
"options": [
|
||||
{
|
||||
"id": "A",
|
||||
"text": "<the a option>"
|
||||
},
|
||||
{
|
||||
"id": "B",
|
||||
"text": "<the b option>"
|
||||
},
|
||||
{
|
||||
"id": "C",
|
||||
"text": "<the c option>"
|
||||
},
|
||||
{
|
||||
"id": "D",
|
||||
"text": "<the d option>"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _passage_blank_space_html():
|
||||
return {
|
||||
"type": "fillBlanks",
|
||||
"variant": "mc",
|
||||
"prompt": "Click a blank to select the appropriate word for it.",
|
||||
"text": (
|
||||
"<The whole text for the exercise with replacements for blank spaces and their "
|
||||
"ids with {{<question id>}} with 2 newlines between paragraphs>"
|
||||
),
|
||||
"solutions": [
|
||||
{
|
||||
"id": "<question id>",
|
||||
"solution": "<the option that holds the solution>"
|
||||
}
|
||||
],
|
||||
"words": [
|
||||
{
|
||||
"id": "<question id>",
|
||||
"options": {
|
||||
"A": "<a option>",
|
||||
"B": "<b option>",
|
||||
"C": "<c option>",
|
||||
"D": "<d option>"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
def _png_completion(self, path_id: str) -> Exam:
|
||||
FileHelper.pdf_to_png(path_id)
|
||||
|
||||
tmp_files = os.listdir(f'./tmp/{path_id}')
|
||||
pages = [f for f in tmp_files if f.startswith('page-') and f.endswith('.png')]
|
||||
pages.sort(key=lambda f: int(f.split('-')[1].split('.')[0]))
|
||||
|
||||
json_schema = {
|
||||
"components": [
|
||||
{"type": "part", "part": "<name or number of the part>"},
|
||||
self._multiple_choice_png(),
|
||||
{"type": "blanksPassage", "text": (
|
||||
"<The whole text for the exercise with replacements for blank spaces and their "
|
||||
"ids with {{<question id>}} with 2 newlines between paragraphs>"
|
||||
)},
|
||||
{"type": "passage", "context": (
|
||||
"<reading passages without paragraphs or line numbers, with 2 newlines between paragraphs>"
|
||||
)},
|
||||
self._passage_blank_space_png()
|
||||
]
|
||||
}
|
||||
|
||||
components = []
|
||||
|
||||
for i in range(len(pages)):
|
||||
current_page = pages[i]
|
||||
next_page = pages[i + 1] if i + 1 < len(pages) else None
|
||||
batch = [current_page, next_page] if next_page else [current_page]
|
||||
|
||||
sheet = self._png_batch(path_id, batch, json_schema)
|
||||
sheet.batch = i + 1
|
||||
components.append(sheet.dict())
|
||||
|
||||
batches = {"batches": components}
|
||||
with open('output.json', 'w') as json_file:
|
||||
json.dump(batches, json_file, indent=4)
|
||||
|
||||
return self._batches_to_exam_completion(batches)
|
||||
|
||||
def _png_batch(self, path_id: str, files: list[str], json_schema) -> Sheet:
|
||||
return self._llm.prediction(
|
||||
[self._gpt_instructions_png(),
|
||||
{
|
||||
"role": "user",
|
||||
"content": [
|
||||
*FileHelper.b64_pngs(path_id, files)
|
||||
]
|
||||
}
|
||||
],
|
||||
ExamMapper.map_to_sheet,
|
||||
str(json_schema)
|
||||
)
|
||||
|
||||
def _gpt_instructions_png(self):
|
||||
return {
|
||||
"role": "system",
|
||||
"content": (
|
||||
'You are GPT OCR and your job is to scan image text data and format it to JSON format.'
|
||||
'Your current task is to scan english questions sheets.\n\n'
|
||||
|
||||
'You will place all the information in a single JSON: {"components": [{...}]} where {...} is a set of '
|
||||
'sheet components you will retrieve from the images, the components and their corresponding JSON '
|
||||
'templates are as follows:\n'
|
||||
|
||||
'- Part, a standalone part or part of a section of the question sheet: '
|
||||
'{"type": "part", "part": "<name or number of the part>"}\n'
|
||||
|
||||
'- Multiple Choice Question, there are three types of multiple choice questions that differ on '
|
||||
'the prompt field of the template: blanks, underlines and normal. '
|
||||
|
||||
'In the blanks prompt you must leave 5 underscores to represent the blank space. '
|
||||
'In the underlines questions the objective is to pick the words that are incorrect in the given '
|
||||
'sentence, for these questions you must wrap the answer to the question with the html tag <u></u>, '
|
||||
'choose 3 other words to wrap in <u></u>, place them in the prompt field and use the underlined words '
|
||||
'in the order they appear in the question for the options A to D, disreguard options that might be '
|
||||
'included underneath the underlines question and use the ones you wrapped in <u></u>.'
|
||||
'In normal you just leave the question as is. '
|
||||
|
||||
f'The template for multiple choice questions is the following: {self._multiple_choice_png()}.\n'
|
||||
|
||||
'- Reading Passages, there are two types of reading passages. Reading passages where you will see '
|
||||
'blanks represented by a (question id) followed by a line, you must format these types of reading '
|
||||
'passages to be only the text with the brackets that have the question id and line replaced with '
|
||||
'"{{question id}}", also place 2 newlines between paragraphs. For the reading passages without blanks '
|
||||
'you must remove any numbers that may be there to specify paragraph numbers or line numbers, '
|
||||
'and place 2 newlines between paragraphs. '
|
||||
|
||||
'For the reading passages with blanks the template is: {"type": "blanksPassage", '
|
||||
'"text": "<The whole text for the exercise with replacements for blank spaces and their '
|
||||
'ids that are enclosed in brackets with {{<question id>}} also place 2 newlines between paragraphs>"}. '
|
||||
|
||||
'For the reading passage without blanks is: {"type": "passage", "context": "<reading passages without '
|
||||
'paragraphs or line numbers, with 2 newlines between paragraphs>"}\n'
|
||||
|
||||
'- Blanks Options, options for a blanks reading passage exercise, this type of component is a group of '
|
||||
'options with the question id and the options from a to d. The template is: '
|
||||
f'{self._passage_blank_space_png()}\n'
|
||||
|
||||
'IMPORTANT: You must place the components in the order that they were given to you. If an exercise or '
|
||||
'reading passages are cut off don\'t include them in the JSON.'
|
||||
)
|
||||
}
|
||||
|
||||
def _multiple_choice_png(self):
|
||||
multiple_choice = self._multiple_choice_html()["questions"][0]
|
||||
multiple_choice["type"] = "multipleChoice"
|
||||
multiple_choice.pop("solution")
|
||||
return multiple_choice
|
||||
|
||||
def _passage_blank_space_png(self):
|
||||
passage_blank_space = self._passage_blank_space_html()["words"][0]
|
||||
passage_blank_space["type"] = "fillBlanks"
|
||||
return passage_blank_space
|
||||
|
||||
def _batches_to_exam_completion(self, batches: Dict[str, Any]) -> Exam:
|
||||
return self._llm.prediction(
|
||||
[self._gpt_instructions_html(),
|
||||
{
|
||||
"role": "user",
|
||||
"content": str(batches)
|
||||
}
|
||||
],
|
||||
ExamMapper.map_to_exam_model,
|
||||
str(self._level_json_schema())
|
||||
)
|
||||
|
||||
def _gpt_instructions_batches(self):
|
||||
return {
|
||||
"role": "system",
|
||||
"content": (
|
||||
'You are helpfull assistant. Your task is to merge multiple batches of english question sheet '
|
||||
'components and solve the questions. Each batch may contain overlapping content with the previous '
|
||||
'batch, or close enough content which needs to be excluded. The components are as follows:'
|
||||
|
||||
'- Part, a standalone part or part of a section of the question sheet: '
|
||||
'{"type": "part", "part": "<name or number of the part>"}\n'
|
||||
|
||||
'- Multiple Choice Question, there are three types of multiple choice questions that differ on '
|
||||
'the prompt field of the template: blanks, underlines and normal. '
|
||||
|
||||
'In a blanks question, the prompt has underscores to represent the blank space, you must select the '
|
||||
'appropriate option to solve it.'
|
||||
|
||||
'In a underlines question, the prompt has 4 underlines represented by the html tags <u></u>, you must '
|
||||
'select the option that makes the prompt incorrect to solve it. If the options order doesn\'t reflect '
|
||||
'the order in which the underlines appear in the prompt you will need to fix it.'
|
||||
|
||||
'In a normal question there isn\'t either blanks or underlines in the prompt, you should just '
|
||||
'select the appropriate solution.'
|
||||
|
||||
f'The template for these questions is the same: {self._multiple_choice_png()}\n'
|
||||
|
||||
'- Reading Passages, there are two types of reading passages with different templates. The one with '
|
||||
'type "blanksPassage" where the text field holds the passage and a blank is represented by '
|
||||
'{{<some number>}} and the other one with type "passage" that has the context field with just '
|
||||
'reading passages. For both of these components you will have to remove any additional data that might '
|
||||
'be related to a question description and also remove some "(<question id>)" and "_" from blanksPassage'
|
||||
' if there are any. These components are used in conjunction with other ones.'
|
||||
|
||||
'- Blanks Options, options for a blanks reading passage exercise, this type of component is a group of '
|
||||
'options with the question id and the options from a to d. The template is: '
|
||||
f'{self._passage_blank_space_png()}\n\n'
|
||||
|
||||
'Now that you know the possible components here\'s what I want you to do:\n'
|
||||
'1. Remove duplicates. A batch will have duplicates of other batches and the components of '
|
||||
'the next batch should always take precedence over the previous one batch, what I mean by this is that '
|
||||
'if batch 1 has, for example, multiple choice question with id 10 and the next one also has id 10, '
|
||||
'you pick the next one.\n'
|
||||
'2. Solve the exercises. There are 4 types of exercises, the 3 multipleChoice variants + a fill blanks '
|
||||
'exercise. For the multiple choice question follow the previous instruction to solve them and place '
|
||||
f'them in this format: {self._multiple_choice_html()}. For the fill blanks exercises you need to match '
|
||||
'the correct blanksPassage to the correct fillBlanks options and then pick the correct option. Here is '
|
||||
f'the template for this exercise: {self._passage_blank_space_html()}.\n'
|
||||
f'3. Restructure the JSON to match this template: {self._level_json_schema()}. You must group the exercises by '
|
||||
'the parts in the order they appear in the batches components. The context field of a part is the '
|
||||
'context of a passage component that has text relevant to normal multiple choice questions.\n'
|
||||
|
||||
'Do your utmost to fullfill the requisites, make sure you include all non-duplicate questions'
|
||||
'in your response and correctly structure the JSON.'
|
||||
)
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def fix_ids(response):
|
||||
counter = 1
|
||||
for part in response["parts"]:
|
||||
for exercise in part["exercises"]:
|
||||
if exercise["type"] == "multipleChoice":
|
||||
for question in exercise["questions"]:
|
||||
question["id"] = counter
|
||||
counter += 1
|
||||
if exercise["type"] == "fillBlanks":
|
||||
for i in range(len(exercise["words"])):
|
||||
exercise["words"][i]["id"] = counter
|
||||
exercise["solutions"][i]["id"] = counter
|
||||
counter += 1
|
||||
return response
|
||||
29
modules/upload_level/sheet_dtos.py
Normal file
29
modules/upload_level/sheet_dtos.py
Normal file
@@ -0,0 +1,29 @@
|
||||
from pydantic import BaseModel
|
||||
from typing import List, Dict, Union, Any, Optional
|
||||
|
||||
|
||||
class Option(BaseModel):
|
||||
id: str
|
||||
text: str
|
||||
|
||||
|
||||
class MultipleChoiceQuestion(BaseModel):
|
||||
type: str = "multipleChoice"
|
||||
id: str
|
||||
prompt: str
|
||||
variant: str = "text"
|
||||
options: List[Option]
|
||||
|
||||
|
||||
class FillBlanksWord(BaseModel):
|
||||
type: str = "fillBlanks"
|
||||
id: str
|
||||
options: Dict[str, str]
|
||||
|
||||
|
||||
Component = Union[MultipleChoiceQuestion, FillBlanksWord, Dict[str, Any]]
|
||||
|
||||
|
||||
class Sheet(BaseModel):
|
||||
batch: Optional[int] = None
|
||||
components: List[Component]
|
||||
BIN
requirements.txt
BIN
requirements.txt
Binary file not shown.
1
tmp/placeholder.txt
Normal file
1
tmp/placeholder.txt
Normal file
@@ -0,0 +1 @@
|
||||
THIS FILE ONLY EXISTS TO KEEP THIS FOLDER IN THE REPO
|
||||
Reference in New Issue
Block a user