Novos Filtos e Correção de erros

This commit is contained in:
Ricardo Cunha 2025-05-08 16:58:10 +01:00
parent 5b45e83693
commit 8702cccb4a

474
main.py
View File

@ -5,6 +5,7 @@ import pyodbc
import json
import re
from dotenv import load_dotenv
from datetime import datetime
load_dotenv()
@ -25,27 +26,6 @@ def connect_db():
print(f"Erro ao conectar à base de dados: {e}")
return None
def get_entry_by_position(position=1):
conn = connect_db()
if not conn:
return "Erro: Não foi possível conectar à base de dados."
try:
with conn.cursor() as cursor:
query = f"SELECT * FROM CUnitBills ORDER BY Id ASC OFFSET {position-1} ROWS FETCH NEXT 1 ROWS ONLY"
cursor.execute(query)
row = cursor.fetchone()
if row:
columns = [column[0] for column in cursor.description]
formatted_data = "\n".join([f"- **{column}**: {value if value is not None else 'Não disponível'}" for column, value in zip(columns, row)])
return formatted_data
return "Nenhum dado encontrado."
except Exception as e:
return f"Erro ao buscar dados: {e}"
finally:
conn.close()
def get_data(atributes=None, limit=20):
conn = connect_db()
@ -132,9 +112,10 @@ def parse_user_input(user_input):
total_requested = False
match_cunit_id = re.search(r"CUnitId\s*(?:de)?\s*(\d+)", user_input, re.IGNORECASE)
match_date_billling_begin = re.search(r"DateBilllingBegin\s*(?:maior que|>=)\s*([\d-]+)", user_input, re.IGNORECASE)
match_date_billing_end = re.search(r"DateBillingEnd\s*(?:igual a|=)\s*([\d-]+)", user_input, re.IGNORECASE)
match_total = re.search(r"total.*CUnitId", user_input, re.IGNORECASE)
match_date_billling_begin = re.search(r"DateBilllingBegin\s*(?:maior que|>=?)\s*([\d]{4}-[\d]{2}-[\d]{2})", user_input, re.IGNORECASE)
match_date_billing_end = re.search(r"DateBillingEnd\s*(?:igual a|=)\s*([\d]{4}-[\d]{2}-[\d]{2})", user_input, re.IGNORECASE)
match_total = re.search(r"total\s+do\s+.*?CUnitId\s*(\d+)", user_input, re.IGNORECASE)
if match_cunit_id:
cunit_id = int(match_cunit_id.group(1))
@ -149,7 +130,6 @@ def parse_user_input(user_input):
total_requested = True
return cunit_id, date_billling_begin, date_billing_end, total_requested
def chat_with_gpt(prompt, attempts=3):
for i in range(attempts):
@ -167,13 +147,139 @@ def chat_with_gpt(prompt, attempts=3):
else:
return "Erro: Limite de requisições atingido várias vezes. Tente novamente mais tarde."
except Exception as e:
return f"Erro na API OpenAI: {e}"
return f"Erro genérico com a API OpenAI: {type(e).__name__} - {e}"
def get_price_comparison():
conn = connect_db()
if not conn:
return "Erro: Não foi possível conectar à base de dados."
try:
cursor = conn.cursor()
query = """
SELECT
cb.CUnitId,
cu.DisplayName,
cbi.UnitPrice AS PrecoFaturado,
tp.EnergyPrice AS PrecoContratualizado,
(cbi.UnitPrice - tp.EnergyPrice) AS Diferenca
FROM CUnitBillItems cbi
JOIN CUnitBills cb ON cbi.BillId = cb.Id
JOIN CUnits cu ON cb.CUnitId = cu.Id
JOIN TariffPrices tp ON cbi.ProductsId = tp.ProductId
ORDER BY Diferenca DESC;
"""
# futuro:
# CASE WHEN tp.EnergyPrice != 0 THEN ((cbi.UnitPrice - tp.EnergyPrice) / tp.EnergyPrice) * 100 ELSE NULL END AS Percentual
cursor.execute(query)
rows = cursor.fetchall()
conn.close()
if rows:
return "\n".join([f"- {row}" for row in rows])
return "Nenhum dado encontrado."
except Exception as e:
conn.close()
return f"Erro ao buscar dados: {e}"
def get_top_consumers_by_month_year(month=None, year = None):
conn = connect_db()
if not conn:
return "Erro: Não foi possível conectar à base de dados."
try:
cursor = conn.cursor()
now = datetime.now()
month = month or now.month
year = year or now.year
query = """
SELECT TOP 4
cb.CUnitId,
cu.DisplayName,
SUM(cbi.Quantity) AS ConsumoTotal
FROM CUnitBillItems cbi
JOIN CUnitBills cb ON cbi.BillId = cb.Id
JOIN CUnits cu ON cb.CUnitId = cu.Id
WHERE YEAR(cb.DateBilllingBegin) = ?
AND MONTH(cb.DateBilllingBegin) = ?
GROUP BY cb.CUnitId, cu.DisplayName
ORDER BY ConsumoTotal DESC;
"""
cursor.execute(query, (year, month))
rows = cursor.fetchall()
conn.close()
if rows:
return "\n".join([f"- {row}" for row in rows])
return "Nenhum dado encontrado."
except Exception as e:
conn.close()
return f"Erro ao buscar dados: {e}"
def get_top_consumers(current=True, year=None):
conn = connect_db()
if not conn:
return "Erro: Não foi possível conectar à base de dados."
try:
cursor = conn.cursor()
if current:
query = """
SELECT TOP 4
cb.CUnitId,
cu.DisplayName,
SUM(cbi.Quantity) AS ConsumoTotal
FROM CUnitBillItems cbi
JOIN CUnitBills cb ON cbi.BillId = cb.Id
JOIN CUnits cu ON cb.CUnitId = cu.Id
WHERE YEAR(cb.DateBilllingBegin) = YEAR(GETDATE())
AND MONTH(cb.DateBilllingBegin) = MONTH(GETDATE())
GROUP BY cb.CUnitId, cu.DisplayName
ORDER BY ConsumoTotal DESC;
"""
cursor.execute(query)
else:
if year:
query = """
SELECT TOP 4
cb.CUnitId,
cu.DisplayName,
SUM(cbi.Quantity) AS ConsumoTotal
FROM CUnitBillItems cbi
JOIN CUnitBills cb ON cbi.BillId = cb.Id
JOIN CUnits cu ON cb.CUnitId = cu.Id
WHERE YEAR(cb.DateBilllingBegin) = ?
AND MONTH(cb.DateBilllingBegin) BETWEEN 1 AND 4
GROUP BY cb.CUnitId, cu.DisplayName
ORDER BY ConsumoTotal DESC;
"""
cursor.execute(query, (year,))
else:
query = """
SELECT TOP 4
cb.CUnitId,
cu.DisplayName,
SUM(cbi.Quantity) AS ConsumoTotal
FROM CUnitBillItems cbi
JOIN CUnitBills cb ON cbi.BillId = cb.Id
JOIN CUnits cu ON cb.CUnitId = cu.Id
WHERE MONTH(cb.DateBilllingBegin) BETWEEN 1 AND 4
GROUP BY cb.CUnitId, cu.DisplayName
ORDER BY ConsumoTotal DESC;
"""
cursor.execute(query)
rows = cursor.fetchall()
conn.close()
if rows:
return "\n".join([f"- {row}" for row in rows])
return "Nenhum dado encontrado."
except Exception as e:
conn.close()
return f"Erro ao buscar dados: {e}"
def get_total_by_cunit(cunit_id, limit=10):
conn = connect_db()
if not conn:
return "Erro: Não foi possível conectar à base de dados."
try:
with conn.cursor() as cursor:
query = f"""
@ -194,6 +300,183 @@ def get_total_by_cunit(cunit_id, limit=10):
finally:
conn.close()
def compare_current_vs_previous_year(month=None, year=None):
conn = connect_db()
if not conn:
return "Erro: Não foi possível conectar à base de dados."
try:
with conn.cursor() as cursor:
now = datetime.now()
month = month or now.month
year = year or now.year
query_current = """
SELECT SUM(cb.Total) AS TotalAtual
FROM CUnitBills cb
WHERE YEAR(cb.DateBilllingBegin) = ?
AND MONTH(cb.DateBilllingBegin) = ?
"""
cursor.execute(query_current, (year, month))
total_atual = cursor.fetchone()[0] or 0
query_previous = """
SELECT SUM(cb.Total) AS TotalAnterior
FROM CUnitBills cb
WHERE YEAR(cb.DateBilllingBegin) = ?
AND MONTH(cb.DateBilllingBegin) = ?
"""
cursor.execute(query_previous, (year - 1, month))
total_anterior = cursor.fetchone()[0] or 0
if total_atual > total_anterior:
return f"Estamos a pagar mais em {month:02}/{year} ({total_atual:.2f}) do que no mesmo período do ano passado ({total_anterior:.2f})."
elif total_atual < total_anterior:
return f"Estamos a pagar menos em {month:02}/{year} ({total_atual:.2f}) do que no mesmo período do ano passado ({total_anterior:.2f})."
else:
return f"Estamos a pagar exatamente o mesmo em {month:02}/{year} que no mesmo período do ano passado."
except Exception as e:
return f"Erro ao comparar os totais: {e}"
finally:
conn.close()
def compare_kwh_current_vs_previous_year(month=None, year=None):
conn = connect_db()
if not conn:
return "Erro: Não foi possível conectar à base de dados."
try:
with conn.cursor() as cursor:
now = datetime.now()
month = month or now.month
year = year or now.year
query_current = """
SELECT SUM(cbi.Quantity) AS TotalAtual
FROM CUnitBillItems cbi
JOIN CUnitBills cb ON cbi.BillId = cb.Id
WHERE YEAR(cb.DateBilllingBegin) = ?
AND MONTH(cb.DateBilllingBegin) = ?
"""
cursor.execute(query_current, (year, month))
total_atual = cursor.fetchone()[0] or 0
total_atual = float(total_atual)
query_previous = """
SELECT SUM(cbi.Quantity) AS TotalAnterior
FROM CUnitBillItems cbi
JOIN CUnitBills cb ON cbi.BillId = cb.Id
WHERE YEAR(cb.DateBilllingBegin) = ?
AND MONTH(cb.DateBilllingBegin) = ?
"""
cursor.execute(query_previous, (year - 1, month))
total_anterior = cursor.fetchone()[0] or 0
total_anterior = float(total_anterior)
if abs(total_atual - total_anterior) <= 0.05 * total_anterior:
return f"O total de kwh no mês {month:02}/{year} ({total_atual:.2f}) está semelhante ao do mesmo mês do ano anterior ({total_anterior:.2f})."
elif total_atual > total_anterior:
return f"O total de kwh no mês {month:02}/{year} ({total_atual:.2f}) é maior que no mesmo mês do ano anterior ({total_anterior:.2f})."
else:
return f"O total de kwh no mês {month:02}/{year} ({total_atual:.2f}) é menor que no mesmo mês do ano anterior ({total_anterior:.2f})."
except Exception as e:
return f"Erro ao comparar os totais de kwh: {e}"
finally:
conn.close()
def get_invoices_by_month_year(month=None, year=None):
conn = connect_db()
if not conn:
return "Erro: Não foi possível conectar à base de dados."
try:
with conn.cursor() as cursor:
now = datetime.now()
month = month or now.month
year = year or now.year
query = """
SELECT COUNT(*) AS TotalFaturas
FROM CUnitBills
WHERE YEAR(RevisionDate) = ?
AND MONTH(RevisionDate) = ?
"""
cursor.execute(query, (year, month))
total_faturas = cursor.fetchone()[0]
if total_faturas > 0:
return f"Foram inseridas {total_faturas} faturas no mês {month:02}/{year}."
else:
return f"Não foram inseridas faturas no mês {month:02}/{year}."
except Exception as e:
return f"Erro ao buscar as faturas: {e}"
finally:
conn.close()
def get_invoices_from_inactive_units():
conn = connect_db()
if not conn:
return "Erro: Não foi possível conectar à base de dados."
try:
with conn.cursor() as cursor:
query = """
SELECT cb.Id AS FaturaId, cu.DisplayName AS Instalacao, cb.DateBilllingBegin, cb.DateBillingEnd, cb.Total
FROM CUnitBills cb
JOIN CUnits cu ON cb.CUnitId = cu.Id
WHERE cu.Active = 0
"""
cursor.execute(query)
rows = cursor.fetchall()
if rows:
formatted_rows = "\n".join([
f"- **FaturaId**: {row[0]}, **Instalação**: {row[1]}, **Início**: {row[2]}, **Fim**: {row[3]}, **Total**: {row[4]:.2f}"
for row in rows
])
return f"Faturas de instalações inativas:\n{formatted_rows}"
else:
return "Não foram encontradas faturas de instalações inativas."
except Exception as e:
return f"Erro ao buscar faturas de instalações inativas: {e}"
finally:
conn.close()
def get_total_kwh_by_building_type(building_type=None):
conn = connect_db()
if not conn:
return "Erro: Não foi possível conectar à base de dados."
try:
with conn.cursor() as cursor:
query = """
SELECT
ct.DisplayName AS TipoEdificio,
SUM(cbi.Quantity) AS TotalKwh
FROM CUnitBillItems cbi
JOIN CUnitBills cb ON cbi.BillId = cb.Id
JOIN CUnits cu ON cb.CUnitId = cu.Id
JOIN CUnitTypes ct ON cu.CUnitType = ct.Id
"""
if building_type:
query += " WHERE ct.DisplayName = ?"
query += " GROUP BY ct.DisplayName ORDER BY TotalKwh DESC"
cursor.execute(query, (building_type,))
else:
query += " GROUP BY ct.DisplayName ORDER BY TotalKwh DESC"
cursor.execute(query)
rows = cursor.fetchall()
if rows:
formatted_rows = "\n".join([
f"- **Tipo de Edifício**: {row[0]}, **Total kWh**: {row[1]:.2f}"
for row in rows
])
return formatted_rows
else:
if building_type:
return f"Nenhum dado encontrado para o tipo de edifício '{building_type}'. Verifique se o nome está correto."
return "Nenhum dado encontrado para os tipos de edifício."
except Exception as e:
return f"Erro ao buscar os dados: {e}"
finally:
conn.close()
if __name__ == "__main__":
@ -203,6 +486,21 @@ if __name__ == "__main__":
conn.close()
else:
print("Erro ao conectar à base de dados.")
month_map = {
"janeiro": 1,
"fevereiro": 2,
"março": 3,
"março": 3,
"abril": 4,
"maio": 5,
"junho": 6,
"julho": 7,
"agosto": 8,
"setembro": 9,
"outubro": 10,
"novembro": 11,
"dezembro": 12
}
while True:
user_input = input("Eu: ")
@ -215,15 +513,131 @@ if __name__ == "__main__":
data = get_total_by_cunit(cunit_id)
print(f"Chatbot: Aqui estão os totais encontrados:\n{data}")
continue
if cunit_id or date_billling_begin or date_billing_end:
data = get_filtered_data(cunit_id, date_billling_begin, date_billing_end)
print(f"Chatbot: Aqui estão os dados encontrados:\n{data}")
continue
if "preços faturados" in user_input.lower():
data = get_price_comparison()
print(f"Chatbot: Aqui está a comparação dos preços:\n{data}")
continue
if re.search(r"mês atual.*igual período.*ano anterior", user_input.lower()):
data = compare_current_vs_previous_year()
print(f"Chatbot: {data}")
continue
if re.search(r"mês.*igual período.*ano anterior", user_input.lower()):
match = re.search(
r"(?:mês\s+de\s+([a-zç]+|\d{1,2}))(?:\s+do\s+ano\s+(\d{4}))?",
user_input.lower()
)
if match:
mes_input = match.group(1).strip().lower()
ano = int(match.group(2)) if match.group(2) else datetime.now().year
if mes_input.isdigit():
mes = int(mes_input)
else:
mes = month_map.get(mes_input)
if not mes:
print("Chatbot: Mês não reconhecido. Tenta novamente.")
continue
else:
mes = datetime.now().month
ano = datetime.now().year
data = compare_current_vs_previous_year(month=mes, year=ano)
print(f"Chatbot: {data}")
continue
if "homólogo" in user_input.lower():
match = re.search(r"homólogo.*?(\d{4})", user_input.lower())
ano = int(match.group(1)) if match else None
data = get_top_consumers(current=False, year=ano)
if ano:
print(f"Chatbot: Aqui estão as instalações com maior consumo no período homólogo de {ano}:\n{data}")
else:
print(f"Chatbot: Aqui estão as instalações com maior consumo no período homólogo atual:\n{data}")
continue
if re.search(r"total de kwh.*mês.*ano anterior", user_input.lower()):
match = re.search(
r"(?:mês\s+de\s+([a-zç]+|\d{1,2}))(?:\s+do\s+ano\s+(\d{4}))?",
user_input.lower()
)
if match:
mes_input = match.group(1).strip().lower()
ano = int(match.group(2)) if match.group(2) else datetime.now().year
if mes_input.isdigit():
mes = int(mes_input)
else:
mes = month_map.get(mes_input)
if not mes:
print("Chatbot: Mês não reconhecido. Tenta novamente.")
continue
else:
mes = datetime.now().month
ano = datetime.now().year
data = compare_kwh_current_vs_previous_year(month=mes, year=ano)
print(f"Chatbot: {data}")
continue
if re.search(r"quantas faturas.*mês", user_input.lower()):
match = re.search(
r"(?:mês\s+de\s+([a-zç]+|\d{1,2}))(?:\s+do\s+ano\s+(\d{4}))?",
user_input.lower()
)
if match:
mes_input = match.group(1).strip().lower()
ano = int(match.group(2)) if match.group(2) else datetime.now().year
if mes_input.isdigit():
mes = int(mes_input)
else:
mes = month_map.get(mes_input)
if not mes:
print("Chatbot: Mês não reconhecido. Tenta novamente.")
continue
else:
mes = datetime.now().month
ano = datetime.now().year
data = get_invoices_by_month_year(month=mes, year=ano)
print(f"Chatbot: {data}")
continue
if re.search(r"faturas.*instalações.*inativas", user_input.lower()):
data = get_invoices_from_inactive_units()
print(f"Chatbot: {data}")
continue
if re.search(r"total de kwh.*tipo de edifícios", user_input.lower()):
match = re.search(r"tipo de edifícios\s+([a-zçãõáéíóúâêîôûäëïöü\s]+)", user_input.lower())
building_type = match.group(1).strip() if match else None
if building_type:
data = get_total_kwh_by_building_type(building_type=building_type)
print(f"Chatbot: Aqui está o total de kWh para o tipo de edifício '{building_type}':\n{data}")
else:
data = get_total_kwh_by_building_type()
print(f"Chatbot: Aqui está o total de kWh por tipo de edifício:\n{data}")
continue
if "dados" in user_input.lower():
data = get_data()
print(f"\nDados do SQL Server:\n{data}")
continue
response = chat_with_gpt(user_input)
print("Chatbot: ", response)
response = chat_with_gpt(user_input)
print("Chatbot: ", response)