Files
encoach_backend/helper/openai_interface.py
2023-12-05 21:43:19 +00:00

144 lines
4.9 KiB
Python

import json
import openai
import os
import re
from dotenv import load_dotenv
load_dotenv()
openai.api_key = os.getenv("OPENAI_API_KEY")
MAX_TOKENS = 4097
TOP_P = 0.9
FREQUENCY_PENALTY = 0.5
TRY_LIMIT = 1
try_count = 0
def process_response(input_string, quotation_check_field):
if '{' in input_string:
try:
# Find the index of the first occurrence of '{'
index = input_string.index('{')
# Extract everything after the first '{' (inclusive)
result = input_string[index:]
if re.search(r"'" + quotation_check_field + "':\s*'(.*?)'", result, re.DOTALL | re.MULTILINE) or \
re.search(r"'" + quotation_check_field + "':\s*\[([^\]]+)]", result, re.DOTALL | re.MULTILINE):
json_obj = json.loads(parse_string(result))
return json_obj
else:
parsed_string = result.replace("\n\n", " ")
parsed_string = parsed_string.replace("\n", " ")
parsed_string = re.sub(r',\s*]', ']', parsed_string)
parsed_string = re.sub(r',\s*}', '}', parsed_string)
if (parsed_string.find('[') == -1) and (parsed_string.find(']') == -1):
parsed_string = parse_string_2(parsed_string)
return json.loads(parsed_string)
return json.loads(parsed_string)
except Exception as e:
print(f"Invalid JSON string! Exception: {e}")
print(f"String: {input_string}")
print(f"Exception: {e}")
else:
return input_string
def parse_string(to_parse: str):
parsed_string = to_parse.replace("\"", "\\\"")
pattern = r"(?<!\w)'|'(?!\w)"
parsed_string = re.sub(pattern, '"', parsed_string)
parsed_string = parsed_string.replace("\\\"", "'")
parsed_string = parsed_string.replace("\n\n", " ")
parsed_string = re.sub(r',\s*]', ']', parsed_string)
parsed_string = re.sub(r',\s*}', '}', parsed_string)
return parsed_string
def parse_string_2(to_parse: str):
keys_and_values_str = to_parse.replace("{", "").replace("}", "")
split_pattern = r'(?<="),|(?<="):'
keys_and_values = re.split(split_pattern, keys_and_values_str)
keys = []
values = []
for idx, x in enumerate(keys_and_values):
if (idx % 2) == 0:
keys.append(x)
else:
values.append(x)
parsed_values = []
for value in values:
parsed_values.append(("\"" + value.replace("\"", "").strip() + "\""))
for ind, parsed_value in enumerate(parsed_values):
to_parse = to_parse.replace(values[ind], parsed_values[ind])
to_parse = to_parse.replace(":", ": ")
return to_parse
def remove_special_chars_and_escapes(input_string):
parsed_string = input_string.replace("\\\"", "'")
parsed_string = parsed_string.replace("\n\n", " ")
# Define a regular expression pattern to match special characters and escapes
pattern = r'(\\[nrt])|[^a-zA-Z0-9\s]'
# Use re.sub() to replace the matched patterns with an empty string
cleaned_string = re.sub(pattern, '', parsed_string)
return cleaned_string
def check_fields(obj, fields):
return all(field in obj for field in fields)
def make_openai_call(model, messages, token_count, fields_to_check, temperature):
global try_count
result = openai.ChatCompletion.create(
model=model,
max_tokens=int(MAX_TOKENS - token_count - 300),
temperature=float(temperature),
top_p=float(TOP_P),
frequency_penalty=float(FREQUENCY_PENALTY),
messages=messages
)
if fields_to_check is None:
return result["choices"][0]["message"]["content"]
processed_response = process_response(result["choices"][0]["message"]["content"], fields_to_check[0])
if check_fields(processed_response, fields_to_check) is False and try_count < TRY_LIMIT:
try_count = try_count + 1
return make_openai_call(model, messages, token_count, fields_to_check, temperature)
elif try_count >= TRY_LIMIT:
try_count = 0
return result["choices"][0]["message"]["content"]
else:
try_count = 0
return processed_response
def make_openai_instruct_call(model, message: str, token_count, fields_to_check, temperature):
global try_count
response = openai.Completion.create(
model=model,
prompt=message,
max_tokens=int(4097 - token_count - 300),
temperature=0.7
)["choices"][0]["text"]
if fields_to_check is None:
return response.replace("\n\n", " ").strip()
processed_response = process_response(response, fields_to_check[0])
if check_fields(processed_response, fields_to_check) is False and try_count < TRY_LIMIT:
try_count = try_count + 1
return make_openai_instruct_call(model, message, token_count, fields_to_check, temperature)
else:
try_count = 0
return processed_response