ChatBot/main.py

230 lines
7.7 KiB
Python

import openai
import os
import time
import pyodbc
import json
import re
from dotenv import load_dotenv
load_dotenv()
client = openai.OpenAI(api_key=os.getenv('OPENAI_API_KEY'))
def connect_db():
try:
conn = pyodbc.connect(
"DRIVER={ODBC Driver 17 for SQL Server};"
f"SERVER={os.getenv('DB_SERVER')};"
f"DATABASE={os.getenv('DB_NAME')};"
f"UID={os.getenv('DB_USER')};"
f"PWD={os.getenv('DB_PASSWORD')};"
)
return conn
except Exception as e:
print(f"Erro ao conectar à base de dados: {e}")
return None
def get_entry_by_position(position=1):
conn = connect_db()
if not conn:
return "Erro: Não foi possível conectar à base de dados."
try:
with conn.cursor() as cursor:
query = f"SELECT * FROM CUnitBills ORDER BY Id ASC OFFSET {position-1} ROWS FETCH NEXT 1 ROWS ONLY"
cursor.execute(query)
row = cursor.fetchone()
if row:
columns = [column[0] for column in cursor.description]
formatted_data = "\n".join([f"- **{column}**: {value if value is not None else 'Não disponível'}" for column, value in zip(columns, row)])
return formatted_data
return "Nenhum dado encontrado."
except Exception as e:
return f"Erro ao buscar dados: {e}"
finally:
conn.close()
def get_data(atributes=None, limit=20):
conn = connect_db()
if not conn:
return "Erro: Não foi possível conectar à base de dados."
try:
with conn.cursor() as cursor:
if atributes:
query = f"SELECT TOP {limit} * FROM CUnitBills WHERE Number LIKE ? ORDER BY Id ASC"
cursor.execute(query, (f"%{atributes}%",))
else:
query = f"SELECT TOP {limit} * FROM CUnitBills ORDER BY Id ASC "
cursor.execute(query)
columns = [column[0] for column in cursor.description]
rows = [dict(zip(columns, row)) for row in cursor.fetchall()]
if rows:
formatted_rows = "\n\n".join([
"\n".join([f"- **{column}**: {row[column]}" for column in columns]) for row in rows
])
return formatted_rows
return "Nenhum dado encontrado."
except Exception as e:
return f"Erro ao buscar dados: {e}"
finally:
conn.close()
def get_filtered_data(cunit_id=None, date_billling_begin=None, date_billing_end=None, limit=2):
conn = connect_db()
if not conn:
return "Erro: Não foi possível conectar à base de dados."
try:
with conn.cursor() as cursor:
query = f"""
SELECT TOP {limit} cb.CUnitId, cu.DisplayName, cu.Address, cb.DateBilllingBegin, cb.DateBillingEnd
From CUnitBills cb
Join CUnits cu ON cb.CUnitId = cu.id
"""
conditions = []
params = []
if cunit_id is not None:
conditions.append("cb.CUnitId = ?")
params.append(cunit_id)
if date_billling_begin :
conditions.append("cb.DateBilllingBegin >= ?")
params.append(date_billling_begin)
if date_billing_end :
conditions.append("cb.DateBillingEnd = ?")
params.append(date_billing_end)
if conditions:
query += " WHERE " + " AND ".join(conditions)
query += " ORDER BY cb.Id ASC"
#print("Query Final:", query) # Para debugging
#print("Parâmetros SQL:", params)
cursor.execute(query, params)
columns = [column[0] for column in cursor.description]
rows = [dict(zip(columns, row)) for row in cursor.fetchall()]
if rows:
formatted_rows = "\n\n".join([
"\n".join([f"- **{column}**: {row[column]}" for column in columns]) for row in rows
])
return formatted_rows
return "Nenhum dado encontrado."
except Exception as e:
return "Erro ao buscar dados. Verifique os critérios e tente novamente."
finally:
conn.close()
def parse_user_input(user_input):
cunit_id = None
date_billling_begin = None
date_billing_end = None
total_requested = False
match_cunit_id = re.search(r"CUnitId\s*(?:de)?\s*(\d+)", user_input, re.IGNORECASE)
match_date_billling_begin = re.search(r"DateBilllingBegin\s*(?:maior que|>=)\s*([\d-]+)", user_input, re.IGNORECASE)
match_date_billing_end = re.search(r"DateBillingEnd\s*(?:igual a|=)\s*([\d-]+)", user_input, re.IGNORECASE)
match_total = re.search(r"total.*CUnitId", user_input, re.IGNORECASE)
if match_cunit_id:
cunit_id = int(match_cunit_id.group(1))
if match_date_billling_begin:
date_billling_begin = match_date_billling_begin.group(1).strip()
if match_date_billing_end:
date_billing_end = match_date_billing_end.group(1).strip()
if match_total:
total_requested = True
return cunit_id, date_billling_begin, date_billing_end, total_requested
def chat_with_gpt(prompt, attempts=3):
for i in range(attempts):
try:
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
max_tokens=300
)
return response.choices[0].message.content.strip()
except openai.RateLimitError:
if i < attempts - 1:
print("Limite de requisições atingido! Tentando novamente...")
time.sleep(10)
else:
return "Erro: Limite de requisições atingido várias vezes. Tente novamente mais tarde."
except Exception as e:
return f"Erro na API OpenAI: {e}"
def get_total_by_cunit(cunit_id, limit=10):
conn = connect_db()
if not conn:
return "Erro: Não foi possível conectar à base de dados."
try:
with conn.cursor() as cursor:
query = f"""
SELECT TOP {limit} cb.Total
FROM CUnitBills cb
WHERE cb.CUnitId = ?
ORDER BY cb.Id ASC
"""
cursor.execute(query, (cunit_id,))
rows = cursor.fetchall()
if rows:
totals = "\n".join([f"- **Total**: {row[0]}" for row in rows])
return totals
return "Nenhum total encontrado para esse CUnitId."
except Exception as e:
return f"Erro ao buscar totais: {e}"
finally:
conn.close()
if __name__ == "__main__":
conn = connect_db()
if conn:
print("Conexão com a base de dados estabelecida com sucesso!")
conn.close()
else:
print("Erro ao conectar à base de dados.")
while True:
user_input = input("Eu: ")
if user_input.lower() in ["quit", "exit", "bye"]:
break
cunit_id, date_billling_begin, date_billing_end, total_requested = parse_user_input(user_input)
if total_requested and cunit_id:
data = get_total_by_cunit(cunit_id)
print(f"Chatbot: Aqui estão os totais encontrados:\n{data}")
continue
if cunit_id or date_billling_begin or date_billing_end:
data = get_filtered_data(cunit_id, date_billling_begin, date_billing_end)
print(f"Chatbot: Aqui estão os dados encontrados:\n{data}")
continue
if "dados" in user_input.lower():
data = get_data()
print(f"\nDados do SQL Server:\n{data}")
continue
response = chat_with_gpt(user_input)
print("Chatbot: ", response)