2025-05-12 18:24:27 +01:00

643 lines
24 KiB
Python

import openai
import os
import time
import pyodbc
import json
import re
from dotenv import load_dotenv
from datetime import datetime
load_dotenv()
client = openai.OpenAI(api_key=os.getenv('OPENAI_API_KEY'))
def connect_db():
try:
conn = pyodbc.connect(
"DRIVER={ODBC Driver 17 for SQL Server};"
f"SERVER={os.getenv('DB_SERVER')};"
f"DATABASE={os.getenv('DB_NAME')};"
f"UID={os.getenv('DB_USER')};"
f"PWD={os.getenv('DB_PASSWORD')};"
)
return conn
except Exception as e:
print(f"Erro ao conectar à base de dados: {e}")
return None
def get_data(atributes=None, limit=20):
conn = connect_db()
if not conn:
return "Erro: Não foi possível conectar à base de dados."
try:
with conn.cursor() as cursor:
if atributes:
query = f"SELECT TOP {limit} * FROM CUnitBills WHERE Number LIKE ? ORDER BY Id ASC"
cursor.execute(query, (f"%{atributes}%",))
else:
query = f"SELECT TOP {limit} * FROM CUnitBills ORDER BY Id ASC "
cursor.execute(query)
columns = [column[0] for column in cursor.description]
rows = [dict(zip(columns, row)) for row in cursor.fetchall()]
if rows:
formatted_rows = "\n\n".join([
"\n".join([f"- **{column}**: {row[column]}" for column in columns]) for row in rows
])
return formatted_rows
return "Nenhum dado encontrado."
except Exception as e:
return f"Erro ao buscar dados: {e}"
finally:
conn.close()
def get_filtered_data(cunit_id=None, date_billling_begin=None, date_billing_end=None, limit=2):
conn = connect_db()
if not conn:
return "Erro: Não foi possível conectar à base de dados."
try:
with conn.cursor() as cursor:
query = f"""
SELECT TOP {limit} cb.CUnitId, cu.DisplayName, cu.Address, cb.DateBilllingBegin, cb.DateBillingEnd
From CUnitBills cb
Join CUnits cu ON cb.CUnitId = cu.id
"""
conditions = []
params = []
if cunit_id is not None:
conditions.append("cb.CUnitId = ?")
params.append(cunit_id)
if date_billling_begin :
conditions.append("cb.DateBilllingBegin >= ?")
params.append(date_billling_begin)
if date_billing_end :
conditions.append("cb.DateBillingEnd = ?")
params.append(date_billing_end)
if conditions:
query += " WHERE " + " AND ".join(conditions)
query += " ORDER BY cb.Id ASC"
#print("Query Final:", query) # Para debugging
#print("Parâmetros SQL:", params)
cursor.execute(query, params)
columns = [column[0] for column in cursor.description]
rows = [dict(zip(columns, row)) for row in cursor.fetchall()]
if rows:
formatted_rows = "\n\n".join([
"\n".join([f"- **{column}**: {row[column]}" for column in columns]) for row in rows
])
return formatted_rows
return "Nenhum dado encontrado."
except Exception as e:
return "Erro ao buscar dados. Verifique os critérios e tente novamente."
finally:
conn.close()
def parse_user_input(user_input):
cunit_id = None
date_billling_begin = None
date_billing_end = None
total_requested = False
match_cunit_id = re.search(r"CUnitId\s*(?:de)?\s*(\d+)", user_input, re.IGNORECASE)
match_date_billling_begin = re.search(r"DateBilllingBegin\s*(?:maior que|>=?)\s*([\d]{4}-[\d]{2}-[\d]{2})", user_input, re.IGNORECASE)
match_date_billing_end = re.search(r"DateBillingEnd\s*(?:igual a|=)\s*([\d]{4}-[\d]{2}-[\d]{2})", user_input, re.IGNORECASE)
match_total = re.search(r"total\s+do\s+.*?CUnitId\s*(\d+)", user_input, re.IGNORECASE)
if match_cunit_id:
cunit_id = int(match_cunit_id.group(1))
if match_date_billling_begin:
date_billling_begin = match_date_billling_begin.group(1).strip()
if match_date_billing_end:
date_billing_end = match_date_billing_end.group(1).strip()
if match_total:
total_requested = True
return cunit_id, date_billling_begin, date_billing_end, total_requested
def chat_with_gpt(prompt, attempts=3):
for i in range(attempts):
try:
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
max_tokens=300
)
return response.choices[0].message.content.strip()
except openai.RateLimitError:
if i < attempts - 1:
print("Limite de requisições atingido! Tentando novamente...")
time.sleep(10)
else:
return "Erro: Limite de requisições atingido várias vezes. Tente novamente mais tarde."
except Exception as e:
return f"Erro genérico com a API OpenAI: {type(e).__name__} - {e}"
def get_price_comparison():
conn = connect_db()
if not conn:
return "Erro: Não foi possível conectar à base de dados."
try:
cursor = conn.cursor()
query = """
SELECT
cb.CUnitId,
cu.DisplayName,
cbi.UnitPrice AS PrecoFaturado,
tp.EnergyPrice AS PrecoContratualizado,
(cbi.UnitPrice - tp.EnergyPrice) AS Diferenca
FROM CUnitBillItems cbi
JOIN CUnitBills cb ON cbi.BillId = cb.Id
JOIN CUnits cu ON cb.CUnitId = cu.Id
JOIN TariffPrices tp ON cbi.ProductsId = tp.ProductId
ORDER BY Diferenca DESC;
"""
# futuro:
# CASE WHEN tp.EnergyPrice != 0 THEN ((cbi.UnitPrice - tp.EnergyPrice) / tp.EnergyPrice) * 100 ELSE NULL END AS Percentual
cursor.execute(query)
rows = cursor.fetchall()
conn.close()
if rows:
return "\n".join([f"- {row}" for row in rows])
return "Nenhum dado encontrado."
except Exception as e:
conn.close()
return f"Erro ao buscar dados: {e}"
def get_top_consumers_by_month_year(month=None, year = None):
conn = connect_db()
if not conn:
return "Erro: Não foi possível conectar à base de dados."
try:
cursor = conn.cursor()
now = datetime.now()
month = month or now.month
year = year or now.year
query = """
SELECT TOP 4
cb.CUnitId,
cu.DisplayName,
SUM(cbi.Quantity) AS ConsumoTotal
FROM CUnitBillItems cbi
JOIN CUnitBills cb ON cbi.BillId = cb.Id
JOIN CUnits cu ON cb.CUnitId = cu.Id
WHERE YEAR(cb.DateBilllingBegin) = ?
AND MONTH(cb.DateBilllingBegin) = ?
GROUP BY cb.CUnitId, cu.DisplayName
ORDER BY ConsumoTotal DESC;
"""
cursor.execute(query, (year, month))
rows = cursor.fetchall()
conn.close()
if rows:
return "\n".join([f"- {row}" for row in rows])
return "Nenhum dado encontrado."
except Exception as e:
conn.close()
return f"Erro ao buscar dados: {e}"
def get_top_consumers(current=True, year=None):
conn = connect_db()
if not conn:
return "Erro: Não foi possível conectar à base de dados."
try:
cursor = conn.cursor()
if current:
query = """
SELECT TOP 4
cb.CUnitId,
cu.DisplayName,
SUM(cbi.Quantity) AS ConsumoTotal
FROM CUnitBillItems cbi
JOIN CUnitBills cb ON cbi.BillId = cb.Id
JOIN CUnits cu ON cb.CUnitId = cu.Id
WHERE YEAR(cb.DateBilllingBegin) = YEAR(GETDATE())
AND MONTH(cb.DateBilllingBegin) = MONTH(GETDATE())
GROUP BY cb.CUnitId, cu.DisplayName
ORDER BY ConsumoTotal DESC;
"""
cursor.execute(query)
else:
if year:
query = """
SELECT TOP 4
cb.CUnitId,
cu.DisplayName,
SUM(cbi.Quantity) AS ConsumoTotal
FROM CUnitBillItems cbi
JOIN CUnitBills cb ON cbi.BillId = cb.Id
JOIN CUnits cu ON cb.CUnitId = cu.Id
WHERE YEAR(cb.DateBilllingBegin) = ?
AND MONTH(cb.DateBilllingBegin) BETWEEN 1 AND 4
GROUP BY cb.CUnitId, cu.DisplayName
ORDER BY ConsumoTotal DESC;
"""
cursor.execute(query, (year,))
else:
query = """
SELECT TOP 4
cb.CUnitId,
cu.DisplayName,
SUM(cbi.Quantity) AS ConsumoTotal
FROM CUnitBillItems cbi
JOIN CUnitBills cb ON cbi.BillId = cb.Id
JOIN CUnits cu ON cb.CUnitId = cu.Id
WHERE MONTH(cb.DateBilllingBegin) BETWEEN 1 AND 4
GROUP BY cb.CUnitId, cu.DisplayName
ORDER BY ConsumoTotal DESC;
"""
cursor.execute(query)
rows = cursor.fetchall()
conn.close()
if rows:
return "\n".join([f"- {row}" for row in rows])
return "Nenhum dado encontrado."
except Exception as e:
conn.close()
return f"Erro ao buscar dados: {e}"
def get_total_by_cunit(cunit_id, limit=10):
conn = connect_db()
if not conn:
return "Erro: Não foi possível conectar à base de dados."
try:
with conn.cursor() as cursor:
query = f"""
SELECT TOP {limit} cb.Total
FROM CUnitBills cb
WHERE cb.CUnitId = ?
ORDER BY cb.Id ASC
"""
cursor.execute(query, (cunit_id,))
rows = cursor.fetchall()
if rows:
totals = "\n".join([f"- **Total**: {row[0]}" for row in rows])
return totals
return "Nenhum total encontrado para esse CUnitId."
except Exception as e:
return f"Erro ao buscar totais: {e}"
finally:
conn.close()
def compare_current_vs_previous_year(month=None, year=None):
conn = connect_db()
if not conn:
return "Erro: Não foi possível conectar à base de dados."
try:
with conn.cursor() as cursor:
now = datetime.now()
month = month or now.month
year = year or now.year
query_current = """
SELECT SUM(cb.Total) AS TotalAtual
FROM CUnitBills cb
WHERE YEAR(cb.DateBilllingBegin) = ?
AND MONTH(cb.DateBilllingBegin) = ?
"""
cursor.execute(query_current, (year, month))
total_atual = cursor.fetchone()[0] or 0
query_previous = """
SELECT SUM(cb.Total) AS TotalAnterior
FROM CUnitBills cb
WHERE YEAR(cb.DateBilllingBegin) = ?
AND MONTH(cb.DateBilllingBegin) = ?
"""
cursor.execute(query_previous, (year - 1, month))
total_anterior = cursor.fetchone()[0] or 0
if total_atual > total_anterior:
return f"Estamos a pagar mais em {month:02}/{year} ({total_atual:.2f}) do que no mesmo período do ano passado ({total_anterior:.2f})."
elif total_atual < total_anterior:
return f"Estamos a pagar menos em {month:02}/{year} ({total_atual:.2f}) do que no mesmo período do ano passado ({total_anterior:.2f})."
else:
return f"Estamos a pagar exatamente o mesmo em {month:02}/{year} que no mesmo período do ano passado."
except Exception as e:
return f"Erro ao comparar os totais: {e}"
finally:
conn.close()
def compare_kwh_current_vs_previous_year(month=None, year=None):
conn = connect_db()
if not conn:
return "Erro: Não foi possível conectar à base de dados."
try:
with conn.cursor() as cursor:
now = datetime.now()
month = month or now.month
year = year or now.year
query_current = """
SELECT SUM(cbi.Quantity) AS TotalAtual
FROM CUnitBillItems cbi
JOIN CUnitBills cb ON cbi.BillId = cb.Id
WHERE YEAR(cb.DateBilllingBegin) = ?
AND MONTH(cb.DateBilllingBegin) = ?
"""
cursor.execute(query_current, (year, month))
total_atual = cursor.fetchone()[0] or 0
total_atual = float(total_atual)
query_previous = """
SELECT SUM(cbi.Quantity) AS TotalAnterior
FROM CUnitBillItems cbi
JOIN CUnitBills cb ON cbi.BillId = cb.Id
WHERE YEAR(cb.DateBilllingBegin) = ?
AND MONTH(cb.DateBilllingBegin) = ?
"""
cursor.execute(query_previous, (year - 1, month))
total_anterior = cursor.fetchone()[0] or 0
total_anterior = float(total_anterior)
if abs(total_atual - total_anterior) <= 0.05 * total_anterior:
return f"O total de kwh no mês {month:02}/{year} ({total_atual:.2f}) está semelhante ao do mesmo mês do ano anterior ({total_anterior:.2f})."
elif total_atual > total_anterior:
return f"O total de kwh no mês {month:02}/{year} ({total_atual:.2f}) é maior que no mesmo mês do ano anterior ({total_anterior:.2f})."
else:
return f"O total de kwh no mês {month:02}/{year} ({total_atual:.2f}) é menor que no mesmo mês do ano anterior ({total_anterior:.2f})."
except Exception as e:
return f"Erro ao comparar os totais de kwh: {e}"
finally:
conn.close()
def get_invoices_by_month_year(month=None, year=None):
conn = connect_db()
if not conn:
return "Erro: Não foi possível conectar à base de dados."
try:
with conn.cursor() as cursor:
now = datetime.now()
month = month or now.month
year = year or now.year
query = """
SELECT COUNT(*) AS TotalFaturas
FROM CUnitBills
WHERE YEAR(RevisionDate) = ?
AND MONTH(RevisionDate) = ?
"""
cursor.execute(query, (year, month))
total_faturas = cursor.fetchone()[0]
if total_faturas > 0:
return f"Foram inseridas {total_faturas} faturas no mês {month:02}/{year}."
else:
return f"Não foram inseridas faturas no mês {month:02}/{year}."
except Exception as e:
return f"Erro ao buscar as faturas: {e}"
finally:
conn.close()
def get_invoices_from_inactive_units():
conn = connect_db()
if not conn:
return "Erro: Não foi possível conectar à base de dados."
try:
with conn.cursor() as cursor:
query = """
SELECT cb.Id AS FaturaId, cu.DisplayName AS Instalacao, cb.DateBilllingBegin, cb.DateBillingEnd, cb.Total
FROM CUnitBills cb
JOIN CUnits cu ON cb.CUnitId = cu.Id
WHERE cu.Active = 0
"""
cursor.execute(query)
rows = cursor.fetchall()
if rows:
formatted_rows = "\n".join([
f"- **FaturaId**: {row[0]}, **Instalação**: {row[1]}, **Início**: {row[2]}, **Fim**: {row[3]}, **Total**: {row[4]:.2f}"
for row in rows
])
return f"Faturas de instalações inativas:\n{formatted_rows}"
else:
return "Não foram encontradas faturas de instalações inativas."
except Exception as e:
return f"Erro ao buscar faturas de instalações inativas: {e}"
finally:
conn.close()
def get_total_kwh_by_building_type(building_type=None):
conn = connect_db()
if not conn:
return "Erro: Não foi possível conectar à base de dados."
try:
with conn.cursor() as cursor:
query = """
SELECT
ct.DisplayName AS TipoEdificio,
SUM(cbi.Quantity) AS TotalKwh
FROM CUnitBillItems cbi
JOIN CUnitBills cb ON cbi.BillId = cb.Id
JOIN CUnits cu ON cb.CUnitId = cu.Id
JOIN CUnitTypes ct ON cu.CUnitType = ct.Id
"""
if building_type:
query += " WHERE ct.DisplayName = ?"
query += " GROUP BY ct.DisplayName ORDER BY TotalKwh DESC"
cursor.execute(query, (building_type,))
else:
query += " GROUP BY ct.DisplayName ORDER BY TotalKwh DESC"
cursor.execute(query)
rows = cursor.fetchall()
if rows:
formatted_rows = "\n".join([
f"- **Tipo de Edifício**: {row[0]}, **Total kWh**: {row[1]:.2f}"
for row in rows
])
return formatted_rows
else:
if building_type:
return f"Nenhum dado encontrado para o tipo de edifício '{building_type}'. Verifique se o nome está correto."
return "Nenhum dado encontrado para os tipos de edifício."
except Exception as e:
return f"Erro ao buscar os dados: {e}"
finally:
conn.close()
if __name__ == "__main__":
conn = connect_db()
if conn:
print("Conexão com a base de dados estabelecida com sucesso!")
conn.close()
else:
print("Erro ao conectar à base de dados.")
month_map = {
"janeiro": 1,
"fevereiro": 2,
"março": 3,
"março": 3,
"abril": 4,
"maio": 5,
"junho": 6,
"julho": 7,
"agosto": 8,
"setembro": 9,
"outubro": 10,
"novembro": 11,
"dezembro": 12
}
while True:
user_input = input("Eu: ")
if user_input.lower() in ["quit", "exit", "bye"]:
break
cunit_id, date_billling_begin, date_billing_end, total_requested = parse_user_input(user_input)
if total_requested and cunit_id:
data = get_total_by_cunit(cunit_id)
print(f"Chatbot: Aqui estão os totais encontrados:\n{data}")
continue
if cunit_id or date_billling_begin or date_billing_end:
data = get_filtered_data(cunit_id, date_billling_begin, date_billing_end)
print(f"Chatbot: Aqui estão os dados encontrados:\n{data}")
continue
if "preços faturados" in user_input.lower():
data = get_price_comparison()
print(f"Chatbot: Aqui está a comparação dos preços:\n{data}")
continue
if re.search(r"mês atual.*igual período.*ano anterior", user_input.lower()):
data = compare_current_vs_previous_year()
print(f"Chatbot: {data}")
continue
if re.search(r"mês.*igual período.*ano anterior", user_input.lower()):
match = re.search(
r"(?:mês\s+de\s+([a-zç]+|\d{1,2}))(?:\s+do\s+ano\s+(\d{4}))?",
user_input.lower()
)
if match:
mes_input = match.group(1).strip().lower()
ano = int(match.group(2)) if match.group(2) else datetime.now().year
if mes_input.isdigit():
mes = int(mes_input)
else:
mes = month_map.get(mes_input)
if not mes:
print("Chatbot: Mês não reconhecido. Tenta novamente.")
continue
else:
mes = datetime.now().month
ano = datetime.now().year
data = compare_current_vs_previous_year(month=mes, year=ano)
print(f"Chatbot: {data}")
continue
if "homólogo" in user_input.lower():
match = re.search(r"homólogo.*?(\d{4})", user_input.lower())
ano = int(match.group(1)) if match else None
data = get_top_consumers(current=False, year=ano)
if ano:
print(f"Chatbot: Aqui estão as instalações com maior consumo no período homólogo de {ano}:\n{data}")
else:
print(f"Chatbot: Aqui estão as instalações com maior consumo no período homólogo atual:\n{data}")
continue
if re.search(r"total de kwh.*mês.*ano anterior", user_input.lower()):
match = re.search(
r"(?:mês\s+de\s+([a-zç]+|\d{1,2}))(?:\s+do\s+ano\s+(\d{4}))?",
user_input.lower()
)
if match:
mes_input = match.group(1).strip().lower()
ano = int(match.group(2)) if match.group(2) else datetime.now().year
if mes_input.isdigit():
mes = int(mes_input)
else:
mes = month_map.get(mes_input)
if not mes:
print("Chatbot: Mês não reconhecido. Tenta novamente.")
continue
else:
mes = datetime.now().month
ano = datetime.now().year
data = compare_kwh_current_vs_previous_year(month=mes, year=ano)
print(f"Chatbot: {data}")
continue
if re.search(r"quantas faturas.*mês", user_input.lower()):
match = re.search(
r"(?:mês\s+de\s+([a-zç]+|\d{1,2}))(?:\s+do\s+ano\s+(\d{4}))?",
user_input.lower()
)
if match:
mes_input = match.group(1).strip().lower()
ano = int(match.group(2)) if match.group(2) else datetime.now().year
if mes_input.isdigit():
mes = int(mes_input)
else:
mes = month_map.get(mes_input)
if not mes:
print("Chatbot: Mês não reconhecido. Tenta novamente.")
continue
else:
mes = datetime.now().month
ano = datetime.now().year
data = get_invoices_by_month_year(month=mes, year=ano)
print(f"Chatbot: {data}")
continue
if re.search(r"faturas.*instalações.*inativas", user_input.lower()):
data = get_invoices_from_inactive_units()
print(f"Chatbot: {data}")
continue
if re.search(r"total de kwh.*tipo de edifícios", user_input.lower()):
match = re.search(r"tipo de edifícios\s+([a-zçãõáéíóúâêîôûäëïöü\s]+)", user_input.lower())
building_type = match.group(1).strip() if match else None
if building_type:
data = get_total_kwh_by_building_type(building_type=building_type)
print(f"Chatbot: Aqui está o total de kWh para o tipo de edifício '{building_type}':\n{data}")
else:
data = get_total_kwh_by_building_type()
print(f"Chatbot: Aqui está o total de kWh por tipo de edifício:\n{data}")
continue
if "dados" in user_input.lower():
data = get_data()
print(f"\nDados do SQL Server:\n{data}")
continue
response = chat_with_gpt(user_input)
print("Chatbot: ", response)