Files
encoach_backend/helper/openai_interface.py
2024-01-07 19:36:57 +00:00

225 lines
8.2 KiB
Python

import json
import openai
import os
import re
from dotenv import load_dotenv
load_dotenv()
openai.api_key = os.getenv("OPENAI_API_KEY")
MAX_TOKENS = 4097
TOP_P = 0.9
FREQUENCY_PENALTY = 0.5
TRY_LIMIT = 1
try_count = 0
# GRADING SUMMARY
chat_config = {'max_tokens': 1000, 'temperature': 0.2}
section_keys = ['reading', 'listening', 'writing', 'speaking', 'level']
grade_top_limit = 9
tools = [{
"type": "function",
"function": {
"name": "save_evaluation_and_suggestions",
"description": "Saves the evaluation and suggestions requested by input.",
"parameters": {
"type": "object",
"properties": {
"evaluation": {
"type": "string",
"description": "A comment on the IELTS section grade obtained in the specific section and what it could mean without suggestions.",
},
"suggestions": {
"type": "string",
"description": "A small paragraph text with suggestions on how to possibly get a better grade than the one obtained.",
},
},
"required": ["evaluation", "suggestions"],
},
}
}]
###
def process_response(input_string, quotation_check_field):
if '{' in input_string:
try:
# Find the index of the first occurrence of '{'
index = input_string.index('{')
# Extract everything after the first '{' (inclusive)
result = input_string[index:]
if re.search(r"'" + quotation_check_field + "':\s*'(.*?)'", result, re.DOTALL | re.MULTILINE) or \
re.search(r"'" + quotation_check_field + "':\s*\[([^\]]+)]", result, re.DOTALL | re.MULTILINE):
json_obj = json.loads(parse_string(result))
return json_obj
else:
parsed_string = result.replace("\n\n", " ")
parsed_string = parsed_string.replace("\n", " ")
parsed_string = re.sub(r',\s*]', ']', parsed_string)
parsed_string = re.sub(r',\s*}', '}', parsed_string)
if (parsed_string.find('[') == -1) and (parsed_string.find(']') == -1):
parsed_string = parse_string_2(parsed_string)
return json.loads(parsed_string)
return json.loads(parsed_string)
except Exception as e:
print(f"Invalid JSON string! Exception: {e}")
print(f"String: {input_string}")
print(f"Exception: {e}")
else:
return input_string
def parse_string(to_parse: str):
parsed_string = to_parse.replace("\"", "\\\"")
pattern = r"(?<!\w)'|'(?!\w)"
parsed_string = re.sub(pattern, '"', parsed_string)
parsed_string = parsed_string.replace("\\\"", "'")
parsed_string = parsed_string.replace("\n\n", " ")
parsed_string = re.sub(r',\s*]', ']', parsed_string)
parsed_string = re.sub(r',\s*}', '}', parsed_string)
return parsed_string
def parse_string_2(to_parse: str):
keys_and_values_str = to_parse.replace("{", "").replace("}", "")
split_pattern = r'(?<="),|(?<="):'
keys_and_values = re.split(split_pattern, keys_and_values_str)
keys = []
values = []
for idx, x in enumerate(keys_and_values):
if (idx % 2) == 0:
keys.append(x)
else:
values.append(x)
parsed_values = []
for value in values:
parsed_values.append(("\"" + value.replace("\"", "").strip() + "\""))
for ind, parsed_value in enumerate(parsed_values):
to_parse = to_parse.replace(values[ind], parsed_values[ind])
to_parse = to_parse.replace(":", ": ")
return to_parse
def remove_special_chars_and_escapes(input_string):
parsed_string = input_string.replace("\\\"", "'")
parsed_string = parsed_string.replace("\n\n", " ")
# Define a regular expression pattern to match special characters and escapes
pattern = r'(\\[nrt])|[^a-zA-Z0-9\s]'
# Use re.sub() to replace the matched patterns with an empty string
cleaned_string = re.sub(pattern, '', parsed_string)
return cleaned_string
def check_fields(obj, fields):
return all(field in obj for field in fields)
def make_openai_call(model, messages, token_count, fields_to_check, temperature):
global try_count
result = openai.ChatCompletion.create(
model=model,
max_tokens=int(MAX_TOKENS - token_count - 300),
temperature=float(temperature),
top_p=float(TOP_P),
frequency_penalty=float(FREQUENCY_PENALTY),
messages=messages
)
if fields_to_check is None:
return result["choices"][0]["message"]["content"]
processed_response = process_response(result["choices"][0]["message"]["content"], fields_to_check[0])
if check_fields(processed_response, fields_to_check) is False and try_count < TRY_LIMIT:
try_count = try_count + 1
return make_openai_call(model, messages, token_count, fields_to_check, temperature)
elif try_count >= TRY_LIMIT:
try_count = 0
return result["choices"][0]["message"]["content"]
else:
try_count = 0
return processed_response
def make_openai_instruct_call(model, message: str, token_count, fields_to_check, temperature):
global try_count
response = openai.Completion.create(
model=model,
prompt=message,
max_tokens=int(4097 - token_count - 300),
temperature=0.7
)["choices"][0]["text"]
if fields_to_check is None:
return response.replace("\n\n", " ").strip()
processed_response = process_response(response, fields_to_check[0])
if check_fields(processed_response, fields_to_check) is False and try_count < TRY_LIMIT:
try_count = try_count + 1
return make_openai_instruct_call(model, message, token_count, fields_to_check, temperature)
else:
try_count = 0
return processed_response
# GRADING SUMMARY
def calculate_grading_summary(body):
extracted_sections = extract_existing_sections_from_body(body, section_keys)
ret = []
for section in extracted_sections:
openai_response_dict = calculate_section_grade_summary(section)
ret = ret + [{'code': section['code'], 'name': section['name'], 'grade': section['grade'],
'evaluation': openai_response_dict['evaluation'],
'suggestions': openai_response_dict['suggestions']}]
return {'sections': ret}
def calculate_section_grade_summary(section):
res = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
max_tokens=chat_config['max_tokens'],
temperature=chat_config['temperature'],
tools=tools,
messages=[
{
"role": "user",
"content": "You are a IELTS test section grade evaluator. You will receive a IELTS test section name and the grade obtained in the section. You should offer a comment on this grade with also suggestions on how to possibly get a better grade.",
},
{
"role": "user",
"content": "Section: " + str(section['name']) + " Grade: " + str(section['grade']),
},
{"role": "user", "content": "Speak in third person."},
{"role": "user", "content": "Please save the evaluation and suggestions generated."}
])
return parse_openai_response(res)
def parse_openai_response(response):
if 'choices' in response and len(response['choices']) > 0 and 'message' in response['choices'][
0] and 'tool_calls' in response['choices'][0]['message'] and isinstance(
response['choices'][0]['message']['tool_calls'], list) and len(
response['choices'][0]['message']['tool_calls']) > 0 and \
response['choices'][0]['message']['tool_calls'][0]['function']['arguments']:
return json.loads(response['choices'][0]['message']['tool_calls'][0]['function']['arguments'])
else:
return {'evaluation': "", 'suggestions': ""}
def extract_existing_sections_from_body(my_dict, keys_to_extract):
if 'sections' in my_dict and isinstance(my_dict['sections'], list) and len(my_dict['sections']) > 0:
return list(filter(
lambda item: 'code' in item and item['code'] in keys_to_extract and 'grade' in item and 'name' in item,
my_dict['sections']))