ChatBot - Tranduções a funcionar com a estrutura

This commit is contained in:
Ricardo Cunha 2025-06-17 01:47:36 +01:00
parent a44bdc5fce
commit d823baed31
3 changed files with 247 additions and 180 deletions

View File

@ -28,23 +28,20 @@ def connect_db():
return None
def get_data(atributes=None, limit=20):
def get_data(table_name="CUnitBills", atributes=None, limit=20):
conn = connect_db()
if not conn:
return "Erro: Não foi possível conectar à base de dados."
try:
with conn.cursor() as cursor:
if atributes:
query = f"SELECT TOP {limit} * FROM CUnitBills WHERE Number LIKE ? ORDER BY Id ASC"
query = f"SELECT TOP {limit} * FROM {table_name} WHERE Number LIKE ? ORDER BY Id ASC"
cursor.execute(query, (f"%{atributes}%",))
else:
query = f"SELECT TOP {limit} * FROM CUnitBills ORDER BY Id ASC "
query = f"SELECT TOP {limit} * FROM {table_name} ORDER BY Id ASC"
cursor.execute(query)
columns = [column[0] for column in cursor.description]
rows = [dict(zip(columns, row)) for row in cursor.fetchall()]
if rows:
formatted_rows = "\n\n".join([
"\n".join([f"- **{column}**: {row[column]}" for column in columns]) for row in rows
@ -733,6 +730,11 @@ if __name__ == "__main__":
print(f"Chatbot: Aqui está o total de kWh por tipo de edifício:\n{data}")
continue
if user_input.lower() in ["cunitbills", "cunits", "cunittypes"]:
data = get_data(table_name=user_input)
print(f"\nDados da tabela {user_input}:\n{data}")
continue
if "dados" in user_input.lower():
data = get_data()
print(f"\nDados do SQL Server:\n{data}")

View File

@ -2,229 +2,294 @@ from flask import Flask, request, jsonify
from flask_cors import CORS
import json
import os
from main import (
chat_with_gpt, parse_user_input, get_total_by_cunit, get_filtered_data,
get_price_comparison, compare_current_vs_previous_year, get_top_consumers,
compare_kwh_current_vs_previous_year, get_invoices_by_month_year,get_schema_with_examples,
get_invoices_from_inactive_units, get_total_kwh_by_building_type, get_data
)
import re
from datetime import datetime
import pyodbc
import pyodbc
from main import (
chat_with_gpt,
parse_user_input,
get_total_by_cunit,
get_filtered_data,
get_price_comparison,
compare_current_vs_previous_year,
get_top_consumers,
compare_kwh_current_vs_previous_year,
get_invoices_by_month_year,
get_schema_with_examples,
get_invoices_from_inactive_units,
get_total_kwh_by_building_type,
get_data,
)
app = Flask(__name__)
CORS(app)
CORS(app)
month_map = {
"janeiro": 1, "fevereiro": 2, "março": 3, "abril": 4, "maio": 5, "junho": 6,
"julho": 7, "agosto": 8, "setembro": 9, "outubro": 10, "novembro": 11, "dezembro": 12
"julho": 7, "agosto": 8, "setembro": 9, "outubro": 10, "novembro": 11, "dezembro": 12,
}
column_mapping = {
"id": "Id",
"tipo de energia": "EnergyTypesId",
"unidade de consumo": "CUnitId",
"tipo de fatura": "CUnitBillsInvoiceTypeId",
"tipo de documento": "DocumentTypeId",
"contrato eletricidade": "CUnitContractElectId",
"número": "Number",
"data de receção da fatura": "DateBillReceipt",
"data": "Date",
"data inicial": "DateBilllingBegin",
"data final": "DateBillingEnd",
"prazo de pagamento": "PaymentDeadline",
"total": "Total",
"mb entidade": "MBEnt",
"mb referência": "MBRef",
"saldo anterior": "PreviousBalance",
"saldo anterior dc": "PreviousBalanceDC",
"pagamentos efetuados": "PaymentsMade",
"saldo de pagamentos efetuados": "PaymentsMadeBalance",
"saldo de pagamentos efetuados dc": "PaymentsMadeBalanceDC",
"faturado": "Billed",
"saldo faturado": "BilledBalance",
"saldo faturado dc": "BilledBalanceDC",
"saldo atual": "CurrentBalance",
"saldo atual dc": "CurrentBalanceDC",
"fator de potência": "PowerFactor",
"potência tomada": "PotTomada",
"total sem iva normal": "TotalExcludingNormalVAT",
"total sem iva reduzido": "TotalExcludingReducedVAT",
"total iva normal": "TotalNormalVAT",
"total iva reduzido": "TotalReducedVAT",
"emissão co2": "CO2Emission",
"consumo médio período faturação": "AvgConsBillingPeriod",
"consumo médio últimos 12m": "AvgConsLast12M",
"informação adicional": "AddicionalInfo",
"data de revisão": "RevisionDate",
"número normalizado": "NormalizedNumber",
"data de pagamento": "PaymentDate"
# CUnitsBills
"Id": "Id",
"Tipo de Energia": "EnergyTypesId",
"Unidade de Consumo": "CUnitId",
"Tipo de Fatura": "CUnitBillsInvoiceTypeId",
"Tipo de Documento": "DocumentTypeId",
"Contrato de Eletricidade": "CUnitContractElectId",
"Número": "Number",
"Data de Receção da Fatura": "DateBillReceipt",
"Data": "Date",
"Data Inicial": "DateBilllingBegin",
"Data Final": "DateBillingEnd",
"Prazo de Pagamento": "PaymentDeadline",
"Total": "Total",
"MB Entidade": "MBEnt",
"MB Referência": "MBRef",
"Saldo Anterior": "PreviousBalance",
"Saldo Anterior DC": "PreviousBalanceDC",
"Pagamentos Efetuados": "PaymentsMade",
"Saldo de Pagamentos Efetuados": "PaymentsMadeBalance",
"Saldo de Pagamentos Efetuados DC": "PaymentsMadeBalanceDC",
"Faturado": "Billed",
"Saldo Faturado": "BilledBalance",
"Saldo Faturado DC": "BilledBalanceDC",
"Saldo Atual": "CurrentBalance",
"Saldo Atual DC": "CurrentBalanceDC",
"Fator de Potência": "PowerFactor",
"Potência Tomada": "PotTomada",
"Total sem IVA Normal": "TotalExcludingNormalVAT",
"Total sem IVA Reduzido": "TotalExcludingReducedVAT",
"Total IVA Normal": "TotalNormalVAT",
"Total IVA Reduzido": "TotalReducedVAT",
"Emissão CO2": "CO2Emission",
"Consumo Médio Período Faturação": "AvgConsBillingPeriod",
"Consumo Médio Últimos 12 Meses": "AvgConsLast12M",
"Informação Adicional": "AddicionalInfo",
"Data de Revisão": "RevisionDate",
"Número Normalizado": "NormalizedNumber",
"Data de Pagamento": "PaymentDate",
# CUnits
"Id": "Id",
"Cidade": "CitiesId",
"Tipo de unidade": "CUnitType",
"Ativo": "Active",
"Apagado": "IsDeleted",
"Código CIL": "CUnitCodeCIL",
"Código CPE": "CUnitCodeCPE",
"Nome da Instalação": "DisplayName",
"Morada": "Address",
"Localidade": "LocalityId",
"Contrato Eletricidade": "ContractElectricityId",
"Contrato Gás": "ContratoGasId",
"Distribuição Potência": "PowerDistributionId",
"Código Postal": "PostalCode",
"Descrição": "Description",
"Latitude": "LatitudeCoordinate",
"Longitude": "LongitudeCoordinate",
"Subestações": "Substations",
"Divisão Cidade": "CityDivisionId",
"ID Construção": "CUnitConstructionId",
"Uso": "CUnitUseId",
"Classificação Energética": "CUnitEnergyClassifId",
"url Consumo": "urlConsumo",
"url CCTV": "urlCCTV",
"url AVAC": "urlAVAC",
"Caminho CCTV": "pathCCTV",
"Data Inativa": "DataInativa",
"data de Revisão": "RevisionDate",
"Ver só Unidades com Permissão": "ViewOnlyCunitsWithPermission",
"Localização Contador": "MeterLocation",
# CUnitTypes
"Id Tipo": "Id",
"Nome Tipo": "DisplayName",
"Abreviatura": "ShortName",
"Descrição Tipo": "Description",
"Ativo Tipo": "Active",
}
column_mapping_inv = {v.lower(): k for k, v in column_mapping.items()}
def translate_record(record: dict, mapping_inv: dict) -> dict:
"""Traduz as chaves técnicas de um dict para PT usando mapping_inv."""
return {mapping_inv.get(k.lower(), k): v for k, v in record.items()}
def translate_any(data, mapping_inv: dict):
"""Traduz dict ou lista de dicts; mantém outros tipos intactos."""
if isinstance(data, list):
return [translate_record(row, mapping_inv) for row in data]
if isinstance(data, dict):
return translate_record(data, mapping_inv)
return data
def dicts_to_markdown(rows):
"""Converte dict(s) em string Markdown."""
if isinstance(rows, dict):
rows = [rows]
if isinstance(rows, list) and rows and isinstance(rows[0], dict):
return "\n".join(
" - ".join(f"**{k}**: {v}" for k, v in row.items()) for row in rows
)
return str(rows)
def translate_text_block(text: str, mapping_inv: dict) -> str:
"""Substitui nomes técnicos em blocos de texto por PT (caseinsensitive)."""
for eng, pt in sorted(mapping_inv.items(), key=lambda x: -len(x[0])):
pattern = re.compile(re.escape(eng), re.IGNORECASE)
text = pattern.sub(pt, text)
return text
def make_reply(content):
"""Aplica tradução final se for string, devolve dict jsonifyready."""
if isinstance(content, (dict, list)):
content = dicts_to_markdown(content)
content = translate_text_block(content, column_mapping_inv)
return jsonify({"reply": content})
def get_db_schema():
conn = pyodbc.connect('DRIVER={SQL Server};SERVER=SEU_SERVIDOR;DATABASE=SEU_BANCO;UID=USUARIO;PWD=SENHA')
conn = pyodbc.connect(
"DRIVER={SQL Server};SERVER=SEU_SERVIDOR;DATABASE=SEU_BANCO;UID=USUARIO;PWD=SENHA"
)
cursor = conn.cursor()
tables = []
for row in cursor.tables(tableType='TABLE'):
table_name = row.table_name
columns = []
for col in cursor.columns(table=table_name):
columns.append({'name': col.column_name, 'type': col.type_name})
tables.append({'table': table_name, 'columns': columns})
for row in cursor.tables(tableType="TABLE"):
cols = [
{"name": col.column_name, "type": col.type_name}
for col in cursor.columns(table=row.table_name)
]
tables.append({"table": row.table_name, "columns": cols})
conn.close()
return tables
def save_schema_to_file(schema, mapping, folder='schema'):
os.makedirs(folder, exist_ok=True)
with open(os.path.join(folder, 'schema.json'), 'w', encoding='utf-8') as f:
json.dump({'schema': schema, 'mapping': mapping}, f, ensure_ascii=False, indent=2)
def answer_schema_question(question):
def save_schema_to_file(schema, mapping, folder="schema"):
os.makedirs(folder, exist_ok=True)
with open(os.path.join(folder, "schema.json"), "w", encoding="utf-8") as f:
json.dump({"schema": schema, "mapping": mapping}, f, ensure_ascii=False, indent=2)
def answer_schema_question(question: str) -> str:
schema = get_schema_with_examples()
question = question.lower()
if "tabelas" in question:
return "As tabelas disponíveis são: " + ", ".join([t['table'] for t in schema])
q_lower = question.lower()
if "tabelas" in q_lower:
return "As tabelas disponíveis são: " + ", ".join(t["table"] for t in schema)
for t in schema:
if t['table'].lower() in question:
cols = ", ".join([c['name'] for c in t['columns']])
return f"A tabela {t['table']} tem as colunas: {cols}.\nExemplo: {t['example']}"
if t["table"].lower() in q_lower:
cols = ", ".join(c["name"] for c in t["columns"])
exemplo_pt = translate_record(t["example"], column_mapping_inv)
return (
f"A tabela {t['table']} tem as colunas: {cols}.\n"
f"Exemplo traduzido: {dicts_to_markdown(exemplo_pt)}"
)
return "Não consegui encontrar informação sobre essa tabela ou coluna."
@app.route('/api/schema-exemplo', methods=['GET'])
@app.route("/api/schema-exemplo", methods=["GET"])
def schema_exemplo():
schema_info = get_schema_with_examples()
return jsonify({'schema': schema_info})
return jsonify({"schema": get_schema_with_examples()})
@app.route('/api/schema', methods=['GET'])
@app.route("/api/schema", methods=["GET"])
def schema():
schema_info = get_db_schema()
save_schema_to_file(schema_info, column_mapping)
return jsonify({'schema': schema_info, 'mapping': column_mapping})
return jsonify({"schema": schema_info, "mapping": column_mapping})
@app.route('/api/chat', methods=['POST'])
@app.route("/api/chat", methods=["POST"])
def chat():
data = request.json
user_input = data.get('message', '').lower()
if "tabela" in user_input or "coluna" in user_input:
resposta = answer_schema_question(user_input)
return jsonify({'reply': resposta})
if "colunas" in user_input and ("fatura" in user_input or "cunitbills" in user_input):
schema_info = get_db_schema()
for table in schema_info:
if table['table'].lower() == 'cunitbills':
colunas = [col['name'] for col in table['columns']]
return jsonify({'reply': "As colunas da tabela de faturas são:\n" + ", ".join(colunas)})
return jsonify({'reply': "Tabela de faturas não encontrada no banco de dados."})
user_input = data.get("message", "")
lower_input = user_input.lower()
cunit_id, date_billling_begin, date_billing_end, total_requested = parse_user_input(user_input)
if "tabela" in lower_input or "coluna" in lower_input:
return make_reply(answer_schema_question(lower_input))
if "colunas" in lower_input and ("fatura" in lower_input or "cunitbills" in lower_input):
for table in get_db_schema():
if table["table"].lower() == "cunitbills":
return make_reply("As colunas da tabela de faturas são:\n" + ", ".join(c["name"] for c in table["columns"]))
return make_reply("Tabela de faturas não encontrada.")
cunit_id, date_begin, date_end, total_requested = parse_user_input(lower_input)
if total_requested and cunit_id:
data = get_total_by_cunit(cunit_id)
return jsonify({'reply': f"Aqui estão os totais encontrados:\n{data}"})
return make_reply(translate_any(get_total_by_cunit(cunit_id), column_mapping_inv))
if cunit_id or date_billling_begin or date_billing_end:
data = get_filtered_data(cunit_id, date_billling_begin, date_billing_end)
return jsonify({'reply': f"Aqui estão os dados encontrados:\n{data}"})
if cunit_id or date_begin or date_end:
return make_reply(translate_any(get_filtered_data(cunit_id, date_begin, date_end), column_mapping_inv))
if "preços faturados" in user_input.lower():
data = get_price_comparison()
return jsonify({'reply': f"Aqui está a comparação dos preços:\n{data}"})
if "preços faturados" in lower_input:
return make_reply(translate_any(get_price_comparison(), column_mapping_inv))
if re.search(r"mês atual.*igual período.*ano anterior", user_input.lower()):
data = compare_current_vs_previous_year()
return jsonify({'reply': data})
if re.search(r"mês atual.*igual período.*ano anterior", lower_input):
return make_reply(translate_any(compare_current_vs_previous_year(), column_mapping_inv))
if re.search(r"mês.*igual período.*ano anterior", user_input.lower()):
match = re.search(
r"(?:mês\s+de\s+([a-zç]+|\d{1,2}))(?:\s+do\s+ano\s+(\d{4}))?",
user_input.lower()
)
if match:
mes_input = match.group(1).strip().lower()
ano = int(match.group(2)) if match.group(2) else datetime.now().year
if mes_input.isdigit():
mes = int(mes_input)
else:
mes = month_map.get(mes_input)
if re.search(r"mês.*igual período.*ano anterior", lower_input):
mt = re.search(r"(?:mês\s+de\s+([a-zç]+|\d{1,2}))(?:\s+do\s+ano\s+(\d{4}))?", lower_input)
if mt:
mes_txt = mt.group(1).strip().lower()
ano = int(mt.group(2)) if mt.group(2) else datetime.now().year
mes = int(mes_txt) if mes_txt.isdigit() else month_map.get(mes_txt)
if not mes:
return jsonify({'reply': "Mês não reconhecido. Tenta novamente."})
return make_reply("Mês não reconhecido.")
else:
mes = datetime.now().month
ano = datetime.now().year
data = compare_current_vs_previous_year(month=mes, year=ano)
return jsonify({'reply': data})
mes, ano = datetime.now().month, datetime.now().year
return make_reply(translate_any(compare_current_vs_previous_year(month=mes, year=ano), column_mapping_inv))
if "homólogo" in user_input.lower():
match = re.search(r"homólogo.*?(\d{4})", user_input.lower())
ano = int(match.group(1)) if match else None
data = get_top_consumers(current=False, year=ano)
if ano:
return jsonify({'reply': f"Aqui estão as instalações com maior consumo no período homólogo de {ano}:\n{data}"})
else:
return jsonify({'reply': f"Aqui estão as instalações com maior consumo no período homólogo atual:\n{data}"})
if "homólogo" in lower_input:
mt = re.search(r"homólogo.*?(\d{4})", lower_input)
ano = int(mt.group(1)) if mt else None
cabec = f"no período homólogo de {ano}" if ano else "no período homólogo atual"
return make_reply(f"Aqui estão as instalações com maior consumo {cabec}:\n" + dicts_to_markdown(translate_any(get_top_consumers(current=False, year=ano), column_mapping_inv)))
if re.search(r"total de kwh.*mês.*ano anterior", user_input.lower()):
match = re.search(
r"(?:mês\s+de\s+([a-zç]+|\d{1,2}))(?:\s+do\s+ano\s+(\d{4}))?",
user_input.lower()
)
if match:
mes_input = match.group(1).strip().lower()
ano = int(match.group(2)) if match.group(2) else datetime.now().year
if mes_input.isdigit():
mes = int(mes_input)
else:
mes = month_map.get(mes_input)
if re.search(r"total de kwh.*mês.*ano anterior", lower_input):
mt = re.search(r"(?:mês\s+de\s+([a-zç]+|\d{1,2}))(?:\s+do\s+ano\s+(\d{4}))?", lower_input)
if mt:
mes_txt = mt.group(1).strip().lower()
ano = int(mt.group(2)) if mt.group(2) else datetime.now().year
mes = int(mes_txt) if mes_txt.isdigit() else month_map.get(mes_txt)
if not mes:
return jsonify({'reply': "Mês não reconhecido. Tenta novamente."})
return make_reply("Mês não reconhecido.")
else:
mes = datetime.now().month
ano = datetime.now().year
data = compare_kwh_current_vs_previous_year(month=mes, year=ano)
return jsonify({'reply': data})
mes, ano = datetime.now().month, datetime.now().year
return make_reply(translate_any(compare_kwh_current_vs_previous_year(month=mes, year=ano), column_mapping_inv))
if re.search(r"quantas faturas.*mês", user_input.lower()):
match = re.search(
r"(?:mês\s+de\s+([a-zç]+|\d{1,2}))(?:\s+do\s+ano\s+(\d{4}))?",
user_input.lower()
)
if match:
mes_input = match.group(1).strip().lower()
ano = int(match.group(2)) if match.group(2) else datetime.now().year
if mes_input.isdigit():
mes = int(mes_input)
else:
mes = month_map.get(mes_input)
if re.search(r"quantas faturas.*mês", lower_input):
mt = re.search(r"(?:mês\s+de\s+([a-zç]+|\d{1,2}))(?:\s+do\s+ano\s+(\d{4}))?", lower_input)
if mt:
mes_txt = mt.group(1).strip().lower()
ano = int(mt.group(2)) if mt.group(2) else datetime.now().year
mes = int(mes_txt) if mes_txt.isdigit() else month_map.get(mes_txt)
if not mes:
return jsonify({'reply': "Mês não reconhecido. Tenta novamente."})
return make_reply("Mês não reconhecido.")
else:
mes = datetime.now().month
ano = datetime.now().year
data = get_invoices_by_month_year(month=mes, year=ano)
return jsonify({'reply': data})
mes, ano = datetime.now().month, datetime.now().year
return make_reply(translate_any(get_invoices_by_month_year(month=mes, year=ano), column_mapping_inv))
if re.search(r"faturas.*instalações.*inativas", user_input.lower()):
data = get_invoices_from_inactive_units()
return jsonify({'reply': data})
if re.search(r"faturas.*instalações.*inativas", lower_input):
return make_reply(translate_any(get_invoices_from_inactive_units(), column_mapping_inv))
if re.search(r"total de kwh.*tipo de edifícios", user_input.lower()):
match = re.search(r"tipo de edifícios\s+([a-zçãõáéíóúâêîôûäëïöü\s]+)", user_input.lower())
building_type = match.group(1).strip() if match else None
if building_type:
data = get_total_kwh_by_building_type(building_type=building_type)
return jsonify({'reply': f"Aqui está o total de kWh para o tipo de edifício '{building_type}':\n{data}"})
else:
data = get_total_kwh_by_building_type()
return jsonify({'reply': f"Aqui está o total de kWh por tipo de edifício:\n{data}"})
if re.search(r"total de kwh.*tipo de edifícios", lower_input):
mt = re.search(r"tipo de edifícios\s+([a-zçãõáéíóúâêîôûäëïöü\s]+)", lower_input)
building = mt.group(1).strip() if mt else None
dados = get_total_kwh_by_building_type(building_type=building) if building else get_total_kwh_by_building_type()
pref = f"para o tipo de edifício '{building}'" if building else "por tipo de edifício"
return make_reply(f"Aqui está o total de kWh {pref}:\n" + dicts_to_markdown(translate_any(dados, column_mapping_inv)))
if "dados" in user_input.lower():
data = get_data()
return jsonify({'reply': f"\nDados do SQL Server:\n{data}"})
if "dados" in lower_input:
return make_reply(translate_any(get_data(), column_mapping_inv))
response = chat_with_gpt(user_input)
return jsonify({'reply': response})
return make_reply(chat_with_gpt(user_input))
if __name__ == '__main__':
app.run(port=3000, debug=True)
if __name__ == "__main__":
app.run(port=3000, debug=True)