232 lines
8.2 KiB
Python
232 lines
8.2 KiB
Python
import json
|
|
import openai
|
|
import os
|
|
import re
|
|
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv()
|
|
openai.api_key = os.getenv("OPENAI_API_KEY")
|
|
|
|
MAX_TOKENS = 4097
|
|
TOP_P = 0.9
|
|
FREQUENCY_PENALTY = 0.5
|
|
|
|
TRY_LIMIT = 1
|
|
|
|
try_count = 0
|
|
|
|
# GRADING SUMMARY
|
|
chat_config = {'max_tokens': 1000, 'temperature': 0.2}
|
|
section_keys = ['reading', 'listening', 'writing', 'speaking', 'level']
|
|
grade_top_limit = 9
|
|
|
|
tools = [{
|
|
"type": "function",
|
|
"function": {
|
|
"name": "save_evaluation_and_suggestions",
|
|
"description": "Saves the evaluation and suggestions requested by input.",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"evaluation": {
|
|
"type": "string",
|
|
"description": "A comment on the IELTS section grade obtained in the specific section and what it could mean without suggestions.",
|
|
},
|
|
"suggestions": {
|
|
"type": "string",
|
|
"description": "A small paragraph text with suggestions on how to possibly get a better grade than the one obtained.",
|
|
},
|
|
},
|
|
"required": ["evaluation", "suggestions"],
|
|
},
|
|
}
|
|
}]
|
|
|
|
|
|
###
|
|
|
|
def process_response(input_string, quotation_check_field):
|
|
if '{' in input_string:
|
|
try:
|
|
# Find the index of the first occurrence of '{'
|
|
index = input_string.index('{')
|
|
# Extract everything after the first '{' (inclusive)
|
|
result = input_string[index:]
|
|
if re.search(r"'" + quotation_check_field + "':\s*'(.*?)'", result, re.DOTALL | re.MULTILINE) or \
|
|
re.search(r"'" + quotation_check_field + "':\s*\[([^\]]+)]", result, re.DOTALL | re.MULTILINE):
|
|
json_obj = json.loads(parse_string(result))
|
|
return json_obj
|
|
else:
|
|
parsed_string = result.replace("\n\n", " ")
|
|
parsed_string = parsed_string.replace("\n", " ")
|
|
parsed_string = re.sub(r',\s*]', ']', parsed_string)
|
|
parsed_string = re.sub(r',\s*}', '}', parsed_string)
|
|
if (parsed_string.find('[') == -1) and (parsed_string.find(']') == -1):
|
|
parsed_string = parse_string_2(parsed_string)
|
|
return json.loads(parsed_string)
|
|
|
|
return json.loads(parsed_string)
|
|
except Exception as e:
|
|
print(f"Invalid JSON string! Exception: {e}")
|
|
print(f"String: {input_string}")
|
|
print(f"Exception: {e}")
|
|
else:
|
|
return input_string
|
|
|
|
|
|
def parse_string(to_parse: str):
|
|
parsed_string = to_parse.replace("\"", "\\\"")
|
|
pattern = r"(?<!\w)'|'(?!\w)"
|
|
parsed_string = re.sub(pattern, '"', parsed_string)
|
|
parsed_string = parsed_string.replace("\\\"", "'")
|
|
parsed_string = parsed_string.replace("\n\n", " ")
|
|
parsed_string = re.sub(r',\s*]', ']', parsed_string)
|
|
parsed_string = re.sub(r',\s*}', '}', parsed_string)
|
|
return parsed_string
|
|
|
|
|
|
def parse_string_2(to_parse: str):
|
|
keys_and_values_str = to_parse.replace("{", "").replace("}", "")
|
|
split_pattern = r'(?<="),|(?<="):'
|
|
keys_and_values = re.split(split_pattern, keys_and_values_str)
|
|
|
|
keys = []
|
|
values = []
|
|
|
|
for idx, x in enumerate(keys_and_values):
|
|
if (idx % 2) == 0:
|
|
keys.append(x)
|
|
else:
|
|
values.append(x)
|
|
|
|
parsed_values = []
|
|
|
|
for value in values:
|
|
parsed_values.append(("\"" + value.replace("\"", "").strip() + "\""))
|
|
|
|
for ind, parsed_value in enumerate(parsed_values):
|
|
to_parse = to_parse.replace(values[ind], parsed_values[ind])
|
|
|
|
to_parse = to_parse.replace(":", ": ")
|
|
return to_parse
|
|
|
|
|
|
def remove_special_chars_and_escapes(input_string):
|
|
parsed_string = input_string.replace("\\\"", "'")
|
|
parsed_string = parsed_string.replace("\n\n", " ")
|
|
# Define a regular expression pattern to match special characters and escapes
|
|
pattern = r'(\\[nrt])|[^a-zA-Z0-9\s]'
|
|
|
|
# Use re.sub() to replace the matched patterns with an empty string
|
|
cleaned_string = re.sub(pattern, '', parsed_string)
|
|
|
|
return cleaned_string
|
|
|
|
|
|
def check_fields(obj, fields):
|
|
return all(field in obj for field in fields)
|
|
|
|
|
|
def make_openai_call(model, messages, token_count, fields_to_check, temperature):
|
|
global try_count
|
|
result = openai.ChatCompletion.create(
|
|
model=model,
|
|
max_tokens=int(MAX_TOKENS - token_count - 300),
|
|
temperature=float(temperature),
|
|
top_p=float(TOP_P),
|
|
frequency_penalty=float(FREQUENCY_PENALTY),
|
|
messages=messages
|
|
)
|
|
|
|
if fields_to_check is None:
|
|
return result["choices"][0]["message"]["content"]
|
|
|
|
processed_response = process_response(result["choices"][0]["message"]["content"], fields_to_check[0])
|
|
|
|
if check_fields(processed_response, fields_to_check) is False and try_count < TRY_LIMIT:
|
|
try_count = try_count + 1
|
|
return make_openai_call(model, messages, token_count, fields_to_check, temperature)
|
|
elif try_count >= TRY_LIMIT:
|
|
try_count = 0
|
|
return result["choices"][0]["message"]["content"]
|
|
else:
|
|
try_count = 0
|
|
return processed_response
|
|
|
|
|
|
def make_openai_instruct_call(model, message: str, token_count, fields_to_check, temperature):
|
|
global try_count
|
|
response = openai.Completion.create(
|
|
model=model,
|
|
prompt=message,
|
|
max_tokens=int(4097 - token_count - 300),
|
|
temperature=0.7
|
|
)["choices"][0]["text"]
|
|
|
|
if fields_to_check is None:
|
|
return response.replace("\n\n", " ").strip()
|
|
|
|
processed_response = process_response(response, fields_to_check[0])
|
|
if check_fields(processed_response, fields_to_check) is False and try_count < TRY_LIMIT:
|
|
try_count = try_count + 1
|
|
return make_openai_instruct_call(model, message, token_count, fields_to_check, temperature)
|
|
else:
|
|
try_count = 0
|
|
return processed_response
|
|
|
|
|
|
# GRADING SUMMARY
|
|
def calculate_grading_summary(body):
|
|
extracted_sections = extract_existing_sections_from_body(body, section_keys)
|
|
|
|
ret = []
|
|
|
|
for section in extracted_sections:
|
|
openai_response_dict = calculate_section_grade_summary(section)
|
|
ret = ret + [{'code': section['code'], 'name': section['name'], 'grade': section['grade'],
|
|
'evaluation': openai_response_dict['evaluation'],
|
|
'suggestions': openai_response_dict['suggestions']}]
|
|
|
|
return {'sections': ret}
|
|
|
|
|
|
def calculate_section_grade_summary(section):
|
|
res = openai.ChatCompletion.create(
|
|
model="gpt-3.5-turbo",
|
|
max_tokens=chat_config['max_tokens'],
|
|
temperature=chat_config['temperature'],
|
|
tools=tools,
|
|
messages=[
|
|
{
|
|
"role": "user",
|
|
"content": "You are a IELTS test section grade evaluator. You will receive a IELTS test section name and the grade obtained in the section. You should offer a comment on this grade with also suggestions on how to possibly get a better grade.",
|
|
},
|
|
{
|
|
"role": "user",
|
|
"content": "Section: " + str(section['name']) + " Grade: " + str(section['grade']),
|
|
},
|
|
{"role": "user", "content": "Speak in third person."},
|
|
{"role": "user", "content": "Please save the evaluation and suggestions generated."}
|
|
])
|
|
|
|
return parse_openai_response(res)
|
|
|
|
|
|
def parse_openai_response(response):
|
|
if 'choices' in response and len(response['choices']) > 0 and 'message' in response['choices'][
|
|
0] and 'tool_calls' in response['choices'][0]['message'] and isinstance(
|
|
response['choices'][0]['message']['tool_calls'], list) and len(
|
|
response['choices'][0]['message']['tool_calls']) > 0 and \
|
|
response['choices'][0]['message']['tool_calls'][0]['function']['arguments']:
|
|
return json.loads(response['choices'][0]['message']['tool_calls'][0]['function']['arguments'])
|
|
else:
|
|
return {'evaluation': "", 'suggestions': ""}
|
|
|
|
|
|
def extract_existing_sections_from_body(my_dict, keys_to_extract):
|
|
if 'sections' in my_dict and isinstance(my_dict['sections'], list) and len(my_dict['sections']) > 0:
|
|
return list(filter(
|
|
lambda item: 'code' in item and item['code'] in keys_to_extract and 'grade' in item and 'name' in item,
|
|
my_dict['sections']))
|