From d823baed319037f1045705b008ef886fc933d568 Mon Sep 17 00:00:00 2001 From: Ricardo Cunha Date: Tue, 17 Jun 2025 01:47:36 +0100 Subject: [PATCH] =?UTF-8?q?ChatBot=20-=20Trandu=C3=A7=C3=B5es=20a=20funcio?= =?UTF-8?q?nar=20com=20a=20estrutura?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/__pycache__/main.cpython-313.pyc | Bin 38060 -> 38274 bytes ChatBot-Python/src/main.py | 14 +- ChatBot-Python/src/server.py | 413 ++++++++++-------- 3 files changed, 247 insertions(+), 180 deletions(-) diff --git a/ChatBot-Python/src/__pycache__/main.cpython-313.pyc b/ChatBot-Python/src/__pycache__/main.cpython-313.pyc index 700110c31ad56ad89dfa4f750493c22bdc9a2f49..db5ba9858ca8912441465184571951728038091b 100644 GIT binary patch delta 1947 zcmZXUYfuwc6vyw~&2F;UWJ$<|0Ku??x4h)3AQpLJ)$-V`qQKbj2*tue6KySDEb2Ju z)KR>)wbfd*KHBMvHq-W7>kG%njK-=XG9AD7L#I@oTBiNbvl~EZGvvet!4zD!{0Z(ni&`qV)34_^s_t1Wn(gQrYBwi0^uxe#i%dQrPqK3<9gMeQChl0Rzq3P?WGUL>GGLowFQf{=vj zP3zMTl24E@?;+u6xAS9NK`l}=9(JVTPLkos$LB(u9e>c6CL^zjc!>C1u})f{Ga~6w zH{|Wb9;DF3SDlbXA=RMZd8aQ43Q-CLZO{sW)lkT&jA9^CqL`~NGNWKHGKF*zE~X+0 zCDa(dD1?0$A}GwWCLzl=tHcZk+Q0^>JQSoHsKJQPpkFpJkgADV&mqLac~zszg7|!z zNSYYF*|eaBJ5x<<{MQ8zyLko@*pW zOD%Jx`3136F)r#6s;I1NR#-SW-?~@pRhth~h>BDQQY!mg-Ml#~t+J|V-maPzGr7?3 zfT_Ixf`^R9IU}^R*Ak>8xD1u`!>W^Mf#z*`Dw02tPR{k(oV;vRcxZtNQi(`LDqqRN zFd=PB%G}nE{b&`Pi2RW`YZY$|Fmtuf$8%U9*W>GXleAQ-YiV3B4X2P4cZS_2HMZ2$ zwn$Z*q&3}ArCd#h-0xW})8=iy&i%yzM!?8W&hlFJJyrC%^+kA+bvc3-l@I#W3^Kye53GUI3nH0oX51RLx4_Ib6B<4I_0_Nw8vyWWJRgGt- z9cyMEu_$pPo_$PHxzSIZpgYj<_Y)7fxg?EE)T78ujz|QLac3$zU1_fs=?ld|O$B?A z7J)CiJhcY)VsRYsCBa!MSS}@twOq`lI83 z(^186vzhcXqhEg@mkfEkaTdAf^-6(Ea0PNv)e(}ids)0E zG+MBi#)TxRSIt=muJp%Y6E$5LBo63!O@Q*F{XGA~K z^sW)FBzs#=v-O(!zV-rj{*ko=J%oJ;&8ow%4>8_C6M$zLO7 zz=gXCa4GTcI)j^uW%rL*3EkK|XeftSo4dBFecL+w`ul^^tj@tXn>N^BCe|lEY&X~V zx(9<&w@-q{(bMmfNZF|5g7|6$;;XdymwpxDh9IJl^o;i7g3!;SEf_N-b$^!G4u^q1 z6znCN_t!Hn?0ixXI=g?ATjspayKnREJN%MxQvDsi{(-|iK789zFk17#Vh@XH-HxtZGj<=2&Q!gB;7uZyqH&zIk++D&Bs)1ztR^`~ zK`BZXlDCQ7)K21{x`)(+MTyUE(!$ zDRklA@Lqj2v!{?DXJO7-4%;{mrbHfMt|HI5=Mr|*$TVTYIYJ8Rl)5WnqpJ$9fc>rp zd{MvT`h)c{56WhA(#X>L?hp(0K`)%|Zt|+AI^U5hM~E;gwj;teLLgZZQaQq@h-$8p zP2i^dC9}1F7%X$(4RYy$EUIW^3LF+H6~-fl>IP%I}giQI#GV9i>J^_=LE9szlcm>8fmg)rc{O zNJeo$6&^7}XKFmLs62fkd0@9;jBX%D%{d;J%P<GD$L`}f@ zoi2D~L$G$#nqd_B#u`6Dqe5J)T#3`jhSbO6w-&=^>?sv?=hJ z|21rdXUo(z#>OiDUTF{2^U~;WcI!|on@TrO8KigSC3=_+q93iElg$T{KUp2O9LWXF z1HYGfg}LzdIo}{S10l~`xM$AS`zn8{u=sWRjQx$>Hw%~EhMqtx9E_-eAhP?9ZCR*B z7Tan0xO}Q$!Ghe8zn0MSLN)vusj!q#JhkQiy$XnyFB3u!3aJ;|pp}aUg6Lw4wcXBM ziP@8@gey;^YY-NHZ+zUo76J%!c;1q+L=j*X#t#J57dR5 zu&SrR2Lyan|0RBe#c|lN{BPU@!z(u83;LHUZg5s(Oqw2Fl#asb=0>dRv(4N1N7B*O z7W{)g)%vOBk#v1m9N*OkyY_HP|HoQ>reRXZPG(c|FgCQ7fVa-58$j#{WF7~Fz)>EnZE1zZQh_&R(}?;Bq$6#mDc;j$hsOs&PA>%UI5iI3OnyZq3* zuM*$Xcki2#a4U$1rs+5C#GwR^z~w{RaRMr)qj){^O|Pc>@^qZ;^yTUI@CF!u@hd!_ zS00|^;Jagrm05qsP1mcz={N|-CvbzFIo^*khU+KlWE(yE^oKHWxPPLPFU6jFEA`Hk i$1K2|S%V3*ovCwU?nWXyBFNvA#YU{sx65K9PU# dict: + """Traduz as chaves técnicas de um dict para PT usando mapping_inv.""" + return {mapping_inv.get(k.lower(), k): v for k, v in record.items()} + + +def translate_any(data, mapping_inv: dict): + """Traduz dict ou lista de dicts; mantém outros tipos intactos.""" + if isinstance(data, list): + return [translate_record(row, mapping_inv) for row in data] + if isinstance(data, dict): + return translate_record(data, mapping_inv) + return data + + +def dicts_to_markdown(rows): + """Converte dict(s) em string Markdown.""" + if isinstance(rows, dict): + rows = [rows] + if isinstance(rows, list) and rows and isinstance(rows[0], dict): + return "\n".join( + " - ".join(f"**{k}**: {v}" for k, v in row.items()) for row in rows + ) + return str(rows) + + +def translate_text_block(text: str, mapping_inv: dict) -> str: + """Substitui nomes técnicos em blocos de texto por PT (case‑insensitive).""" + for eng, pt in sorted(mapping_inv.items(), key=lambda x: -len(x[0])): + pattern = re.compile(re.escape(eng), re.IGNORECASE) + text = pattern.sub(pt, text) + return text + + +def make_reply(content): + """Aplica tradução final se for string, devolve dict jsonify‑ready.""" + if isinstance(content, (dict, list)): + content = dicts_to_markdown(content) + content = translate_text_block(content, column_mapping_inv) + return jsonify({"reply": content}) + def get_db_schema(): - conn = pyodbc.connect('DRIVER={SQL Server};SERVER=SEU_SERVIDOR;DATABASE=SEU_BANCO;UID=USUARIO;PWD=SENHA') + conn = pyodbc.connect( + "DRIVER={SQL Server};SERVER=SEU_SERVIDOR;DATABASE=SEU_BANCO;UID=USUARIO;PWD=SENHA" + ) cursor = conn.cursor() tables = [] - for row in cursor.tables(tableType='TABLE'): - table_name = row.table_name - columns = [] - for col in cursor.columns(table=table_name): - columns.append({'name': col.column_name, 'type': col.type_name}) - tables.append({'table': table_name, 'columns': columns}) + for row in cursor.tables(tableType="TABLE"): + cols = [ + {"name": col.column_name, "type": col.type_name} + for col in cursor.columns(table=row.table_name) + ] + tables.append({"table": row.table_name, "columns": cols}) conn.close() return tables -def save_schema_to_file(schema, mapping, folder='schema'): - os.makedirs(folder, exist_ok=True) - with open(os.path.join(folder, 'schema.json'), 'w', encoding='utf-8') as f: - json.dump({'schema': schema, 'mapping': mapping}, f, ensure_ascii=False, indent=2) -def answer_schema_question(question): +def save_schema_to_file(schema, mapping, folder="schema"): + os.makedirs(folder, exist_ok=True) + with open(os.path.join(folder, "schema.json"), "w", encoding="utf-8") as f: + json.dump({"schema": schema, "mapping": mapping}, f, ensure_ascii=False, indent=2) + + +def answer_schema_question(question: str) -> str: schema = get_schema_with_examples() - question = question.lower() - if "tabelas" in question: - return "As tabelas disponíveis são: " + ", ".join([t['table'] for t in schema]) + q_lower = question.lower() + + if "tabelas" in q_lower: + return "As tabelas disponíveis são: " + ", ".join(t["table"] for t in schema) + for t in schema: - if t['table'].lower() in question: - cols = ", ".join([c['name'] for c in t['columns']]) - return f"A tabela {t['table']} tem as colunas: {cols}.\nExemplo: {t['example']}" + if t["table"].lower() in q_lower: + cols = ", ".join(c["name"] for c in t["columns"]) + exemplo_pt = translate_record(t["example"], column_mapping_inv) + return ( + f"A tabela {t['table']} tem as colunas: {cols}.\n" + f"Exemplo traduzido: {dicts_to_markdown(exemplo_pt)}" + ) + return "Não consegui encontrar informação sobre essa tabela ou coluna." -@app.route('/api/schema-exemplo', methods=['GET']) +@app.route("/api/schema-exemplo", methods=["GET"]) def schema_exemplo(): - schema_info = get_schema_with_examples() - return jsonify({'schema': schema_info}) + return jsonify({"schema": get_schema_with_examples()}) -@app.route('/api/schema', methods=['GET']) + +@app.route("/api/schema", methods=["GET"]) def schema(): schema_info = get_db_schema() save_schema_to_file(schema_info, column_mapping) - return jsonify({'schema': schema_info, 'mapping': column_mapping}) + return jsonify({"schema": schema_info, "mapping": column_mapping}) -@app.route('/api/chat', methods=['POST']) + +@app.route("/api/chat", methods=["POST"]) def chat(): data = request.json - user_input = data.get('message', '').lower() - - if "tabela" in user_input or "coluna" in user_input: - resposta = answer_schema_question(user_input) - return jsonify({'reply': resposta}) - - if "colunas" in user_input and ("fatura" in user_input or "cunitbills" in user_input): - schema_info = get_db_schema() - for table in schema_info: - if table['table'].lower() == 'cunitbills': - colunas = [col['name'] for col in table['columns']] - return jsonify({'reply': "As colunas da tabela de faturas são:\n" + ", ".join(colunas)}) - return jsonify({'reply': "Tabela de faturas não encontrada no banco de dados."}) + user_input = data.get("message", "") + lower_input = user_input.lower() - cunit_id, date_billling_begin, date_billing_end, total_requested = parse_user_input(user_input) + if "tabela" in lower_input or "coluna" in lower_input: + return make_reply(answer_schema_question(lower_input)) + + if "colunas" in lower_input and ("fatura" in lower_input or "cunitbills" in lower_input): + for table in get_db_schema(): + if table["table"].lower() == "cunitbills": + return make_reply("As colunas da tabela de faturas são:\n" + ", ".join(c["name"] for c in table["columns"])) + return make_reply("Tabela de faturas não encontrada.") + + cunit_id, date_begin, date_end, total_requested = parse_user_input(lower_input) if total_requested and cunit_id: - data = get_total_by_cunit(cunit_id) - return jsonify({'reply': f"Aqui estão os totais encontrados:\n{data}"}) + return make_reply(translate_any(get_total_by_cunit(cunit_id), column_mapping_inv)) - if cunit_id or date_billling_begin or date_billing_end: - data = get_filtered_data(cunit_id, date_billling_begin, date_billing_end) - return jsonify({'reply': f"Aqui estão os dados encontrados:\n{data}"}) + if cunit_id or date_begin or date_end: + return make_reply(translate_any(get_filtered_data(cunit_id, date_begin, date_end), column_mapping_inv)) - if "preços faturados" in user_input.lower(): - data = get_price_comparison() - return jsonify({'reply': f"Aqui está a comparação dos preços:\n{data}"}) + if "preços faturados" in lower_input: + return make_reply(translate_any(get_price_comparison(), column_mapping_inv)) - if re.search(r"mês atual.*igual período.*ano anterior", user_input.lower()): - data = compare_current_vs_previous_year() - return jsonify({'reply': data}) + if re.search(r"mês atual.*igual período.*ano anterior", lower_input): + return make_reply(translate_any(compare_current_vs_previous_year(), column_mapping_inv)) - if re.search(r"mês.*igual período.*ano anterior", user_input.lower()): - match = re.search( - r"(?:mês\s+de\s+([a-zç]+|\d{1,2}))(?:\s+do\s+ano\s+(\d{4}))?", - user_input.lower() - ) - if match: - mes_input = match.group(1).strip().lower() - ano = int(match.group(2)) if match.group(2) else datetime.now().year - if mes_input.isdigit(): - mes = int(mes_input) - else: - mes = month_map.get(mes_input) + if re.search(r"mês.*igual período.*ano anterior", lower_input): + mt = re.search(r"(?:mês\s+de\s+([a-zç]+|\d{1,2}))(?:\s+do\s+ano\s+(\d{4}))?", lower_input) + if mt: + mes_txt = mt.group(1).strip().lower() + ano = int(mt.group(2)) if mt.group(2) else datetime.now().year + mes = int(mes_txt) if mes_txt.isdigit() else month_map.get(mes_txt) if not mes: - return jsonify({'reply': "Mês não reconhecido. Tenta novamente."}) + return make_reply("Mês não reconhecido.") else: - mes = datetime.now().month - ano = datetime.now().year - data = compare_current_vs_previous_year(month=mes, year=ano) - return jsonify({'reply': data}) + mes, ano = datetime.now().month, datetime.now().year + return make_reply(translate_any(compare_current_vs_previous_year(month=mes, year=ano), column_mapping_inv)) - if "homólogo" in user_input.lower(): - match = re.search(r"homólogo.*?(\d{4})", user_input.lower()) - ano = int(match.group(1)) if match else None - data = get_top_consumers(current=False, year=ano) - if ano: - return jsonify({'reply': f"Aqui estão as instalações com maior consumo no período homólogo de {ano}:\n{data}"}) - else: - return jsonify({'reply': f"Aqui estão as instalações com maior consumo no período homólogo atual:\n{data}"}) + if "homólogo" in lower_input: + mt = re.search(r"homólogo.*?(\d{4})", lower_input) + ano = int(mt.group(1)) if mt else None + cabec = f"no período homólogo de {ano}" if ano else "no período homólogo atual" + return make_reply(f"Aqui estão as instalações com maior consumo {cabec}:\n" + dicts_to_markdown(translate_any(get_top_consumers(current=False, year=ano), column_mapping_inv))) - if re.search(r"total de kwh.*mês.*ano anterior", user_input.lower()): - match = re.search( - r"(?:mês\s+de\s+([a-zç]+|\d{1,2}))(?:\s+do\s+ano\s+(\d{4}))?", - user_input.lower() - ) - if match: - mes_input = match.group(1).strip().lower() - ano = int(match.group(2)) if match.group(2) else datetime.now().year - if mes_input.isdigit(): - mes = int(mes_input) - else: - mes = month_map.get(mes_input) + if re.search(r"total de kwh.*mês.*ano anterior", lower_input): + mt = re.search(r"(?:mês\s+de\s+([a-zç]+|\d{1,2}))(?:\s+do\s+ano\s+(\d{4}))?", lower_input) + if mt: + mes_txt = mt.group(1).strip().lower() + ano = int(mt.group(2)) if mt.group(2) else datetime.now().year + mes = int(mes_txt) if mes_txt.isdigit() else month_map.get(mes_txt) if not mes: - return jsonify({'reply': "Mês não reconhecido. Tenta novamente."}) + return make_reply("Mês não reconhecido.") else: - mes = datetime.now().month - ano = datetime.now().year - data = compare_kwh_current_vs_previous_year(month=mes, year=ano) - return jsonify({'reply': data}) + mes, ano = datetime.now().month, datetime.now().year + return make_reply(translate_any(compare_kwh_current_vs_previous_year(month=mes, year=ano), column_mapping_inv)) - if re.search(r"quantas faturas.*mês", user_input.lower()): - match = re.search( - r"(?:mês\s+de\s+([a-zç]+|\d{1,2}))(?:\s+do\s+ano\s+(\d{4}))?", - user_input.lower() - ) - if match: - mes_input = match.group(1).strip().lower() - ano = int(match.group(2)) if match.group(2) else datetime.now().year - if mes_input.isdigit(): - mes = int(mes_input) - else: - mes = month_map.get(mes_input) + if re.search(r"quantas faturas.*mês", lower_input): + mt = re.search(r"(?:mês\s+de\s+([a-zç]+|\d{1,2}))(?:\s+do\s+ano\s+(\d{4}))?", lower_input) + if mt: + mes_txt = mt.group(1).strip().lower() + ano = int(mt.group(2)) if mt.group(2) else datetime.now().year + mes = int(mes_txt) if mes_txt.isdigit() else month_map.get(mes_txt) if not mes: - return jsonify({'reply': "Mês não reconhecido. Tenta novamente."}) + return make_reply("Mês não reconhecido.") else: - mes = datetime.now().month - ano = datetime.now().year - data = get_invoices_by_month_year(month=mes, year=ano) - return jsonify({'reply': data}) + mes, ano = datetime.now().month, datetime.now().year + return make_reply(translate_any(get_invoices_by_month_year(month=mes, year=ano), column_mapping_inv)) - if re.search(r"faturas.*instalações.*inativas", user_input.lower()): - data = get_invoices_from_inactive_units() - return jsonify({'reply': data}) + if re.search(r"faturas.*instalações.*inativas", lower_input): + return make_reply(translate_any(get_invoices_from_inactive_units(), column_mapping_inv)) - if re.search(r"total de kwh.*tipo de edifícios", user_input.lower()): - match = re.search(r"tipo de edifícios\s+([a-zçãõáéíóúâêîôûäëïöü\s]+)", user_input.lower()) - building_type = match.group(1).strip() if match else None - if building_type: - data = get_total_kwh_by_building_type(building_type=building_type) - return jsonify({'reply': f"Aqui está o total de kWh para o tipo de edifício '{building_type}':\n{data}"}) - else: - data = get_total_kwh_by_building_type() - return jsonify({'reply': f"Aqui está o total de kWh por tipo de edifício:\n{data}"}) + if re.search(r"total de kwh.*tipo de edifícios", lower_input): + mt = re.search(r"tipo de edifícios\s+([a-zçãõáéíóúâêîôûäëïöü\s]+)", lower_input) + building = mt.group(1).strip() if mt else None + dados = get_total_kwh_by_building_type(building_type=building) if building else get_total_kwh_by_building_type() + pref = f"para o tipo de edifício '{building}'" if building else "por tipo de edifício" + return make_reply(f"Aqui está o total de kWh {pref}:\n" + dicts_to_markdown(translate_any(dados, column_mapping_inv))) - if "dados" in user_input.lower(): - data = get_data() - return jsonify({'reply': f"\nDados do SQL Server:\n{data}"}) + if "dados" in lower_input: + return make_reply(translate_any(get_data(), column_mapping_inv)) - response = chat_with_gpt(user_input) - return jsonify({'reply': response}) + return make_reply(chat_with_gpt(user_input)) -if __name__ == '__main__': - app.run(port=3000, debug=True) \ No newline at end of file +if __name__ == "__main__": + app.run(port=3000, debug=True)