diff --git a/ChatBot-Python/src/__pycache__/main.cpython-313.pyc b/ChatBot-Python/src/__pycache__/main.cpython-313.pyc index 700110c..db5ba98 100644 Binary files a/ChatBot-Python/src/__pycache__/main.cpython-313.pyc and b/ChatBot-Python/src/__pycache__/main.cpython-313.pyc differ diff --git a/ChatBot-Python/src/main.py b/ChatBot-Python/src/main.py index 9f648da..c4463d2 100644 --- a/ChatBot-Python/src/main.py +++ b/ChatBot-Python/src/main.py @@ -28,23 +28,20 @@ def connect_db(): return None -def get_data(atributes=None, limit=20): +def get_data(table_name="CUnitBills", atributes=None, limit=20): conn = connect_db() if not conn: return "Erro: Não foi possível conectar à base de dados." - try: with conn.cursor() as cursor: if atributes: - query = f"SELECT TOP {limit} * FROM CUnitBills WHERE Number LIKE ? ORDER BY Id ASC" + query = f"SELECT TOP {limit} * FROM {table_name} WHERE Number LIKE ? ORDER BY Id ASC" cursor.execute(query, (f"%{atributes}%",)) else: - query = f"SELECT TOP {limit} * FROM CUnitBills ORDER BY Id ASC " + query = f"SELECT TOP {limit} * FROM {table_name} ORDER BY Id ASC" cursor.execute(query) - columns = [column[0] for column in cursor.description] rows = [dict(zip(columns, row)) for row in cursor.fetchall()] - if rows: formatted_rows = "\n\n".join([ "\n".join([f"- **{column}**: {row[column]}" for column in columns]) for row in rows @@ -733,6 +730,11 @@ if __name__ == "__main__": print(f"Chatbot: Aqui está o total de kWh por tipo de edifício:\n{data}") continue + if user_input.lower() in ["cunitbills", "cunits", "cunittypes"]: + data = get_data(table_name=user_input) + print(f"\nDados da tabela {user_input}:\n{data}") + continue + if "dados" in user_input.lower(): data = get_data() print(f"\nDados do SQL Server:\n{data}") diff --git a/ChatBot-Python/src/server.py b/ChatBot-Python/src/server.py index 46669b9..d036fda 100644 --- a/ChatBot-Python/src/server.py +++ b/ChatBot-Python/src/server.py @@ -2,229 +2,294 @@ from flask import Flask, request, jsonify from flask_cors import CORS import json import os -from main import ( - chat_with_gpt, parse_user_input, get_total_by_cunit, get_filtered_data, - get_price_comparison, compare_current_vs_previous_year, get_top_consumers, - compare_kwh_current_vs_previous_year, get_invoices_by_month_year,get_schema_with_examples, - get_invoices_from_inactive_units, get_total_kwh_by_building_type, get_data -) import re from datetime import datetime -import pyodbc +import pyodbc + +from main import ( + chat_with_gpt, + parse_user_input, + get_total_by_cunit, + get_filtered_data, + get_price_comparison, + compare_current_vs_previous_year, + get_top_consumers, + compare_kwh_current_vs_previous_year, + get_invoices_by_month_year, + get_schema_with_examples, + get_invoices_from_inactive_units, + get_total_kwh_by_building_type, + get_data, +) app = Flask(__name__) -CORS(app) +CORS(app) month_map = { "janeiro": 1, "fevereiro": 2, "março": 3, "abril": 4, "maio": 5, "junho": 6, - "julho": 7, "agosto": 8, "setembro": 9, "outubro": 10, "novembro": 11, "dezembro": 12 + "julho": 7, "agosto": 8, "setembro": 9, "outubro": 10, "novembro": 11, "dezembro": 12, } column_mapping = { - "id": "Id", - "tipo de energia": "EnergyTypesId", - "unidade de consumo": "CUnitId", - "tipo de fatura": "CUnitBillsInvoiceTypeId", - "tipo de documento": "DocumentTypeId", - "contrato eletricidade": "CUnitContractElectId", - "número": "Number", - "data de receção da fatura": "DateBillReceipt", - "data": "Date", - "data inicial": "DateBilllingBegin", - "data final": "DateBillingEnd", - "prazo de pagamento": "PaymentDeadline", - "total": "Total", - "mb entidade": "MBEnt", - "mb referência": "MBRef", - "saldo anterior": "PreviousBalance", - "saldo anterior dc": "PreviousBalanceDC", - "pagamentos efetuados": "PaymentsMade", - "saldo de pagamentos efetuados": "PaymentsMadeBalance", - "saldo de pagamentos efetuados dc": "PaymentsMadeBalanceDC", - "faturado": "Billed", - "saldo faturado": "BilledBalance", - "saldo faturado dc": "BilledBalanceDC", - "saldo atual": "CurrentBalance", - "saldo atual dc": "CurrentBalanceDC", - "fator de potência": "PowerFactor", - "potência tomada": "PotTomada", - "total sem iva normal": "TotalExcludingNormalVAT", - "total sem iva reduzido": "TotalExcludingReducedVAT", - "total iva normal": "TotalNormalVAT", - "total iva reduzido": "TotalReducedVAT", - "emissão co2": "CO2Emission", - "consumo médio período faturação": "AvgConsBillingPeriod", - "consumo médio últimos 12m": "AvgConsLast12M", - "informação adicional": "AddicionalInfo", - "data de revisão": "RevisionDate", - "número normalizado": "NormalizedNumber", - "data de pagamento": "PaymentDate" + # CUnitsBills + "Id": "Id", + "Tipo de Energia": "EnergyTypesId", + "Unidade de Consumo": "CUnitId", + "Tipo de Fatura": "CUnitBillsInvoiceTypeId", + "Tipo de Documento": "DocumentTypeId", + "Contrato de Eletricidade": "CUnitContractElectId", + "Número": "Number", + "Data de Receção da Fatura": "DateBillReceipt", + "Data": "Date", + "Data Inicial": "DateBilllingBegin", + "Data Final": "DateBillingEnd", + "Prazo de Pagamento": "PaymentDeadline", + "Total": "Total", + "MB Entidade": "MBEnt", + "MB Referência": "MBRef", + "Saldo Anterior": "PreviousBalance", + "Saldo Anterior DC": "PreviousBalanceDC", + "Pagamentos Efetuados": "PaymentsMade", + "Saldo de Pagamentos Efetuados": "PaymentsMadeBalance", + "Saldo de Pagamentos Efetuados DC": "PaymentsMadeBalanceDC", + "Faturado": "Billed", + "Saldo Faturado": "BilledBalance", + "Saldo Faturado DC": "BilledBalanceDC", + "Saldo Atual": "CurrentBalance", + "Saldo Atual DC": "CurrentBalanceDC", + "Fator de Potência": "PowerFactor", + "Potência Tomada": "PotTomada", + "Total sem IVA Normal": "TotalExcludingNormalVAT", + "Total sem IVA Reduzido": "TotalExcludingReducedVAT", + "Total IVA Normal": "TotalNormalVAT", + "Total IVA Reduzido": "TotalReducedVAT", + "Emissão CO2": "CO2Emission", + "Consumo Médio Período Faturação": "AvgConsBillingPeriod", + "Consumo Médio Últimos 12 Meses": "AvgConsLast12M", + "Informação Adicional": "AddicionalInfo", + "Data de Revisão": "RevisionDate", + "Número Normalizado": "NormalizedNumber", + "Data de Pagamento": "PaymentDate", + # CUnits + "Id": "Id", + "Cidade": "CitiesId", + "Tipo de unidade": "CUnitType", + "Ativo": "Active", + "Apagado": "IsDeleted", + "Código CIL": "CUnitCodeCIL", + "Código CPE": "CUnitCodeCPE", + "Nome da Instalação": "DisplayName", + "Morada": "Address", + "Localidade": "LocalityId", + "Contrato Eletricidade": "ContractElectricityId", + "Contrato Gás": "ContratoGasId", + "Distribuição Potência": "PowerDistributionId", + "Código Postal": "PostalCode", + "Descrição": "Description", + "Latitude": "LatitudeCoordinate", + "Longitude": "LongitudeCoordinate", + "Subestações": "Substations", + "Divisão Cidade": "CityDivisionId", + "ID Construção": "CUnitConstructionId", + "Uso": "CUnitUseId", + "Classificação Energética": "CUnitEnergyClassifId", + "url Consumo": "urlConsumo", + "url CCTV": "urlCCTV", + "url AVAC": "urlAVAC", + "Caminho CCTV": "pathCCTV", + "Data Inativa": "DataInativa", + "data de Revisão": "RevisionDate", + "Ver só Unidades com Permissão": "ViewOnlyCunitsWithPermission", + "Localização Contador": "MeterLocation", + + # CUnitTypes + "Id Tipo": "Id", + "Nome Tipo": "DisplayName", + "Abreviatura": "ShortName", + "Descrição Tipo": "Description", + "Ativo Tipo": "Active", } +column_mapping_inv = {v.lower(): k for k, v in column_mapping.items()} + + +def translate_record(record: dict, mapping_inv: dict) -> dict: + """Traduz as chaves técnicas de um dict para PT usando mapping_inv.""" + return {mapping_inv.get(k.lower(), k): v for k, v in record.items()} + + +def translate_any(data, mapping_inv: dict): + """Traduz dict ou lista de dicts; mantém outros tipos intactos.""" + if isinstance(data, list): + return [translate_record(row, mapping_inv) for row in data] + if isinstance(data, dict): + return translate_record(data, mapping_inv) + return data + + +def dicts_to_markdown(rows): + """Converte dict(s) em string Markdown.""" + if isinstance(rows, dict): + rows = [rows] + if isinstance(rows, list) and rows and isinstance(rows[0], dict): + return "\n".join( + " - ".join(f"**{k}**: {v}" for k, v in row.items()) for row in rows + ) + return str(rows) + + +def translate_text_block(text: str, mapping_inv: dict) -> str: + """Substitui nomes técnicos em blocos de texto por PT (case‑insensitive).""" + for eng, pt in sorted(mapping_inv.items(), key=lambda x: -len(x[0])): + pattern = re.compile(re.escape(eng), re.IGNORECASE) + text = pattern.sub(pt, text) + return text + + +def make_reply(content): + """Aplica tradução final se for string, devolve dict jsonify‑ready.""" + if isinstance(content, (dict, list)): + content = dicts_to_markdown(content) + content = translate_text_block(content, column_mapping_inv) + return jsonify({"reply": content}) + def get_db_schema(): - conn = pyodbc.connect('DRIVER={SQL Server};SERVER=SEU_SERVIDOR;DATABASE=SEU_BANCO;UID=USUARIO;PWD=SENHA') + conn = pyodbc.connect( + "DRIVER={SQL Server};SERVER=SEU_SERVIDOR;DATABASE=SEU_BANCO;UID=USUARIO;PWD=SENHA" + ) cursor = conn.cursor() tables = [] - for row in cursor.tables(tableType='TABLE'): - table_name = row.table_name - columns = [] - for col in cursor.columns(table=table_name): - columns.append({'name': col.column_name, 'type': col.type_name}) - tables.append({'table': table_name, 'columns': columns}) + for row in cursor.tables(tableType="TABLE"): + cols = [ + {"name": col.column_name, "type": col.type_name} + for col in cursor.columns(table=row.table_name) + ] + tables.append({"table": row.table_name, "columns": cols}) conn.close() return tables -def save_schema_to_file(schema, mapping, folder='schema'): - os.makedirs(folder, exist_ok=True) - with open(os.path.join(folder, 'schema.json'), 'w', encoding='utf-8') as f: - json.dump({'schema': schema, 'mapping': mapping}, f, ensure_ascii=False, indent=2) -def answer_schema_question(question): +def save_schema_to_file(schema, mapping, folder="schema"): + os.makedirs(folder, exist_ok=True) + with open(os.path.join(folder, "schema.json"), "w", encoding="utf-8") as f: + json.dump({"schema": schema, "mapping": mapping}, f, ensure_ascii=False, indent=2) + + +def answer_schema_question(question: str) -> str: schema = get_schema_with_examples() - question = question.lower() - if "tabelas" in question: - return "As tabelas disponíveis são: " + ", ".join([t['table'] for t in schema]) + q_lower = question.lower() + + if "tabelas" in q_lower: + return "As tabelas disponíveis são: " + ", ".join(t["table"] for t in schema) + for t in schema: - if t['table'].lower() in question: - cols = ", ".join([c['name'] for c in t['columns']]) - return f"A tabela {t['table']} tem as colunas: {cols}.\nExemplo: {t['example']}" + if t["table"].lower() in q_lower: + cols = ", ".join(c["name"] for c in t["columns"]) + exemplo_pt = translate_record(t["example"], column_mapping_inv) + return ( + f"A tabela {t['table']} tem as colunas: {cols}.\n" + f"Exemplo traduzido: {dicts_to_markdown(exemplo_pt)}" + ) + return "Não consegui encontrar informação sobre essa tabela ou coluna." -@app.route('/api/schema-exemplo', methods=['GET']) +@app.route("/api/schema-exemplo", methods=["GET"]) def schema_exemplo(): - schema_info = get_schema_with_examples() - return jsonify({'schema': schema_info}) + return jsonify({"schema": get_schema_with_examples()}) -@app.route('/api/schema', methods=['GET']) + +@app.route("/api/schema", methods=["GET"]) def schema(): schema_info = get_db_schema() save_schema_to_file(schema_info, column_mapping) - return jsonify({'schema': schema_info, 'mapping': column_mapping}) + return jsonify({"schema": schema_info, "mapping": column_mapping}) -@app.route('/api/chat', methods=['POST']) + +@app.route("/api/chat", methods=["POST"]) def chat(): data = request.json - user_input = data.get('message', '').lower() - - if "tabela" in user_input or "coluna" in user_input: - resposta = answer_schema_question(user_input) - return jsonify({'reply': resposta}) - - if "colunas" in user_input and ("fatura" in user_input or "cunitbills" in user_input): - schema_info = get_db_schema() - for table in schema_info: - if table['table'].lower() == 'cunitbills': - colunas = [col['name'] for col in table['columns']] - return jsonify({'reply': "As colunas da tabela de faturas são:\n" + ", ".join(colunas)}) - return jsonify({'reply': "Tabela de faturas não encontrada no banco de dados."}) + user_input = data.get("message", "") + lower_input = user_input.lower() - cunit_id, date_billling_begin, date_billing_end, total_requested = parse_user_input(user_input) + if "tabela" in lower_input or "coluna" in lower_input: + return make_reply(answer_schema_question(lower_input)) + + if "colunas" in lower_input and ("fatura" in lower_input or "cunitbills" in lower_input): + for table in get_db_schema(): + if table["table"].lower() == "cunitbills": + return make_reply("As colunas da tabela de faturas são:\n" + ", ".join(c["name"] for c in table["columns"])) + return make_reply("Tabela de faturas não encontrada.") + + cunit_id, date_begin, date_end, total_requested = parse_user_input(lower_input) if total_requested and cunit_id: - data = get_total_by_cunit(cunit_id) - return jsonify({'reply': f"Aqui estão os totais encontrados:\n{data}"}) + return make_reply(translate_any(get_total_by_cunit(cunit_id), column_mapping_inv)) - if cunit_id or date_billling_begin or date_billing_end: - data = get_filtered_data(cunit_id, date_billling_begin, date_billing_end) - return jsonify({'reply': f"Aqui estão os dados encontrados:\n{data}"}) + if cunit_id or date_begin or date_end: + return make_reply(translate_any(get_filtered_data(cunit_id, date_begin, date_end), column_mapping_inv)) - if "preços faturados" in user_input.lower(): - data = get_price_comparison() - return jsonify({'reply': f"Aqui está a comparação dos preços:\n{data}"}) + if "preços faturados" in lower_input: + return make_reply(translate_any(get_price_comparison(), column_mapping_inv)) - if re.search(r"mês atual.*igual período.*ano anterior", user_input.lower()): - data = compare_current_vs_previous_year() - return jsonify({'reply': data}) + if re.search(r"mês atual.*igual período.*ano anterior", lower_input): + return make_reply(translate_any(compare_current_vs_previous_year(), column_mapping_inv)) - if re.search(r"mês.*igual período.*ano anterior", user_input.lower()): - match = re.search( - r"(?:mês\s+de\s+([a-zç]+|\d{1,2}))(?:\s+do\s+ano\s+(\d{4}))?", - user_input.lower() - ) - if match: - mes_input = match.group(1).strip().lower() - ano = int(match.group(2)) if match.group(2) else datetime.now().year - if mes_input.isdigit(): - mes = int(mes_input) - else: - mes = month_map.get(mes_input) + if re.search(r"mês.*igual período.*ano anterior", lower_input): + mt = re.search(r"(?:mês\s+de\s+([a-zç]+|\d{1,2}))(?:\s+do\s+ano\s+(\d{4}))?", lower_input) + if mt: + mes_txt = mt.group(1).strip().lower() + ano = int(mt.group(2)) if mt.group(2) else datetime.now().year + mes = int(mes_txt) if mes_txt.isdigit() else month_map.get(mes_txt) if not mes: - return jsonify({'reply': "Mês não reconhecido. Tenta novamente."}) + return make_reply("Mês não reconhecido.") else: - mes = datetime.now().month - ano = datetime.now().year - data = compare_current_vs_previous_year(month=mes, year=ano) - return jsonify({'reply': data}) + mes, ano = datetime.now().month, datetime.now().year + return make_reply(translate_any(compare_current_vs_previous_year(month=mes, year=ano), column_mapping_inv)) - if "homólogo" in user_input.lower(): - match = re.search(r"homólogo.*?(\d{4})", user_input.lower()) - ano = int(match.group(1)) if match else None - data = get_top_consumers(current=False, year=ano) - if ano: - return jsonify({'reply': f"Aqui estão as instalações com maior consumo no período homólogo de {ano}:\n{data}"}) - else: - return jsonify({'reply': f"Aqui estão as instalações com maior consumo no período homólogo atual:\n{data}"}) + if "homólogo" in lower_input: + mt = re.search(r"homólogo.*?(\d{4})", lower_input) + ano = int(mt.group(1)) if mt else None + cabec = f"no período homólogo de {ano}" if ano else "no período homólogo atual" + return make_reply(f"Aqui estão as instalações com maior consumo {cabec}:\n" + dicts_to_markdown(translate_any(get_top_consumers(current=False, year=ano), column_mapping_inv))) - if re.search(r"total de kwh.*mês.*ano anterior", user_input.lower()): - match = re.search( - r"(?:mês\s+de\s+([a-zç]+|\d{1,2}))(?:\s+do\s+ano\s+(\d{4}))?", - user_input.lower() - ) - if match: - mes_input = match.group(1).strip().lower() - ano = int(match.group(2)) if match.group(2) else datetime.now().year - if mes_input.isdigit(): - mes = int(mes_input) - else: - mes = month_map.get(mes_input) + if re.search(r"total de kwh.*mês.*ano anterior", lower_input): + mt = re.search(r"(?:mês\s+de\s+([a-zç]+|\d{1,2}))(?:\s+do\s+ano\s+(\d{4}))?", lower_input) + if mt: + mes_txt = mt.group(1).strip().lower() + ano = int(mt.group(2)) if mt.group(2) else datetime.now().year + mes = int(mes_txt) if mes_txt.isdigit() else month_map.get(mes_txt) if not mes: - return jsonify({'reply': "Mês não reconhecido. Tenta novamente."}) + return make_reply("Mês não reconhecido.") else: - mes = datetime.now().month - ano = datetime.now().year - data = compare_kwh_current_vs_previous_year(month=mes, year=ano) - return jsonify({'reply': data}) + mes, ano = datetime.now().month, datetime.now().year + return make_reply(translate_any(compare_kwh_current_vs_previous_year(month=mes, year=ano), column_mapping_inv)) - if re.search(r"quantas faturas.*mês", user_input.lower()): - match = re.search( - r"(?:mês\s+de\s+([a-zç]+|\d{1,2}))(?:\s+do\s+ano\s+(\d{4}))?", - user_input.lower() - ) - if match: - mes_input = match.group(1).strip().lower() - ano = int(match.group(2)) if match.group(2) else datetime.now().year - if mes_input.isdigit(): - mes = int(mes_input) - else: - mes = month_map.get(mes_input) + if re.search(r"quantas faturas.*mês", lower_input): + mt = re.search(r"(?:mês\s+de\s+([a-zç]+|\d{1,2}))(?:\s+do\s+ano\s+(\d{4}))?", lower_input) + if mt: + mes_txt = mt.group(1).strip().lower() + ano = int(mt.group(2)) if mt.group(2) else datetime.now().year + mes = int(mes_txt) if mes_txt.isdigit() else month_map.get(mes_txt) if not mes: - return jsonify({'reply': "Mês não reconhecido. Tenta novamente."}) + return make_reply("Mês não reconhecido.") else: - mes = datetime.now().month - ano = datetime.now().year - data = get_invoices_by_month_year(month=mes, year=ano) - return jsonify({'reply': data}) + mes, ano = datetime.now().month, datetime.now().year + return make_reply(translate_any(get_invoices_by_month_year(month=mes, year=ano), column_mapping_inv)) - if re.search(r"faturas.*instalações.*inativas", user_input.lower()): - data = get_invoices_from_inactive_units() - return jsonify({'reply': data}) + if re.search(r"faturas.*instalações.*inativas", lower_input): + return make_reply(translate_any(get_invoices_from_inactive_units(), column_mapping_inv)) - if re.search(r"total de kwh.*tipo de edifícios", user_input.lower()): - match = re.search(r"tipo de edifícios\s+([a-zçãõáéíóúâêîôûäëïöü\s]+)", user_input.lower()) - building_type = match.group(1).strip() if match else None - if building_type: - data = get_total_kwh_by_building_type(building_type=building_type) - return jsonify({'reply': f"Aqui está o total de kWh para o tipo de edifício '{building_type}':\n{data}"}) - else: - data = get_total_kwh_by_building_type() - return jsonify({'reply': f"Aqui está o total de kWh por tipo de edifício:\n{data}"}) + if re.search(r"total de kwh.*tipo de edifícios", lower_input): + mt = re.search(r"tipo de edifícios\s+([a-zçãõáéíóúâêîôûäëïöü\s]+)", lower_input) + building = mt.group(1).strip() if mt else None + dados = get_total_kwh_by_building_type(building_type=building) if building else get_total_kwh_by_building_type() + pref = f"para o tipo de edifício '{building}'" if building else "por tipo de edifício" + return make_reply(f"Aqui está o total de kWh {pref}:\n" + dicts_to_markdown(translate_any(dados, column_mapping_inv))) - if "dados" in user_input.lower(): - data = get_data() - return jsonify({'reply': f"\nDados do SQL Server:\n{data}"}) + if "dados" in lower_input: + return make_reply(translate_any(get_data(), column_mapping_inv)) - response = chat_with_gpt(user_input) - return jsonify({'reply': response}) + return make_reply(chat_with_gpt(user_input)) -if __name__ == '__main__': - app.run(port=3000, debug=True) \ No newline at end of file +if __name__ == "__main__": + app.run(port=3000, debug=True)