# app.py -- KC Robot AI V4.0 (Cloud Brain) # Flask server: Chat (HF), TTS, STT, Telegram poller, REST API cho ESP32 # Setup: set env HF_API_TOKEN, (optional) HF_MODEL, HF_TTS_MODEL, HF_STT_MODEL, TELEGRAM_TOKEN # requirements: see requirements.txt import os import io import time import json import threading import logging from typing import Optional, List, Tuple import requests from flask import Flask, request, jsonify, send_file, render_template_string logging.basicConfig(level=logging.INFO) logger = logging.getLogger("kcrobot.v4") app = Flask(__name__) # ====== Config from env / Secrets ====== HF_API_TOKEN = os.getenv("HF_API_TOKEN", "") HF_MODEL = os.getenv("HF_MODEL", "google/flan-t5-large") HF_TTS_MODEL = os.getenv("HF_TTS_MODEL", "facebook/tts_transformer-es-css10") HF_STT_MODEL = os.getenv("HF_STT_MODEL", "openai/whisper-small") TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN", "") PORT = int(os.getenv("PORT", os.getenv("SERVER_PORT", 7860))) if not HF_API_TOKEN: logger.warning("HF_API_TOKEN not set. Put HF_API_TOKEN in Secrets.") HF_HEADERS = {"Authorization": f"Bearer {HF_API_TOKEN}"} if HF_API_TOKEN else {} # ====== In-memory storage (simple) ====== # conversation: list of (user, bot) pairs CONV: List[Tuple[str,str]] = [] # display_lines for ESP32 OLED (last few lines) DISPLAY_LINES: List[str] = [] # helper to maintain display buffer def push_display(line: str, limit=6): global DISPLAY_LINES DISPLAY_LINES.append(line) if len(DISPLAY_LINES) > limit: DISPLAY_LINES = DISPLAY_LINES[-limit:] # ====== HuggingFace helpers (REST inference) ====== def hf_text_generate(prompt: str, model: Optional[str] = None, max_new_tokens: int = 256, temperature: float = 0.7) -> str: model = model or HF_MODEL url = f"/static-proxy?url=https%3A%2F%2Fapi-inference.huggingface.co%2Fmodels%2F%7Bmodel%7D" payload = { "inputs": prompt, "parameters": {"max_new_tokens": int(max_new_tokens), "temperature": float(temperature)}, "options": {"wait_for_model": True} } r = requests.post(url, headers=HF_HEADERS, json=payload, timeout=120) if r.status_code != 200: logger.error("HF text gen error %s: %s", r.status_code, r.text[:200]) raise RuntimeError(f"HF text generation failed: {r.status_code}: {r.text}") data = r.json() # parse common shapes if isinstance(data, list) and len(data) and isinstance(data[0], dict): return data[0].get("generated_text", "") or str(data[0]) if isinstance(data, dict) and "generated_text" in data: return data.get("generated_text", "") return str(data) def hf_tts_get_mp3(text: str, model: Optional[str] = None) -> bytes: model = model or HF_TTS_MODEL url = f"/static-proxy?url=https%3A%2F%2Fapi-inference.huggingface.co%2Fmodels%2F%7Bmodel%7D" payload = {"inputs": text} headers = dict(HF_HEADERS) headers["Content-Type"] = "application/json" r = requests.post(url, headers=headers, json=payload, stream=True, timeout=120) if r.status_code != 200: logger.error("HF TTS error %s: %s", r.status_code, r.text[:200]) raise RuntimeError(f"HF TTS failed: {r.status_code}: {r.text}") return r.content def hf_stt_from_bytes(audio_bytes: bytes, model: Optional[str] = None) -> str: model = model or HF_STT_MODEL url = f"/static-proxy?url=https%3A%2F%2Fapi-inference.huggingface.co%2Fmodels%2F%7Bmodel%7D" headers = dict(HF_HEADERS) headers["Content-Type"] = "application/octet-stream" r = requests.post(url, headers=headers, data=audio_bytes, timeout=180) if r.status_code != 200: logger.error("HF STT error %s: %s", r.status_code, r.text[:200]) raise RuntimeError(f"HF STT failed: {r.status_code}: {r.text}") j = r.json() # common: {"text":"..."} if isinstance(j, dict) and "text" in j: return j["text"] # fallback return str(j) # ====== Core endpoints for ESP32 ====== @app.route("/ask", methods=["POST"]) def api_ask(): """ESP32 or web call: JSON {text, lang (opt)} -> returns {"answer": "..."}""" data = request.get_json(force=True) text = data.get("text","").strip() lang = data.get("lang","auto") if not text: return jsonify({"error":"no text"}), 400 # build instructive prompt to encourage clear Vietnamese/English responses if lang == "vi": prompt = "Bạn là trợ lý thông minh, trả lời bằng tiếng Việt, rõ ràng và ngắn gọn:\n\n" + text elif lang == "en": prompt = "You are a helpful assistant. Answer in clear English, concise:\n\n" + text else: # auto: simple system instruction bilingual prompt = "Bạn là trợ lý thông minh song ngữ (Vietnamese/English). Trả lời bằng ngôn ngữ phù hợp với câu hỏi.\n\n" + text try: ans = hf_text_generate(prompt) except Exception as e: logger.exception("ask failed") return jsonify({"error": str(e)}), 500 # store conversation and display CONV.append((text, ans)) push_display("YOU: " + (text[:40])) push_display("BOT: " + (ans[:40])) return jsonify({"answer": ans}) @app.route("/tts", methods=["POST"]) def api_tts(): """POST JSON {text: "..."} -> return audio/mpeg bytes (mp3 or wav)""" data = request.get_json(force=True) text = data.get("text","").strip() if not text: return jsonify({"error":"no text"}), 400 try: audio = hf_tts_get_mp3(text) except Exception as e: logger.exception("tts failed") return jsonify({"error": str(e)}), 500 return send_file( io.BytesIO(audio), mimetype="audio/mpeg", as_attachment=False, download_name="tts.mp3" ) @app.route("/stt", methods=["POST"]) def api_stt(): """ Accepts raw audio bytes in body OR multipart 'file'. Returns JSON {"text": "..."} """ if "file" in request.files: f = request.files["file"] audio_bytes = f.read() else: audio_bytes = request.get_data() if not audio_bytes: return jsonify({"error":"no audio"}), 400 try: text = hf_stt_from_bytes(audio_bytes) except Exception as e: logger.exception("stt failed") return jsonify({"error": str(e)}), 500 # push to display push_display("UserAudio: " + (text[:40])) return jsonify({"text": text}) @app.route("/presence", methods=["POST"]) def api_presence(): """ ESP32 radar -> POST JSON {"event":"presence","note": "..."}. Server: will announce greeting (call TTS) and send Telegram alert. """ data = request.get_json(force=True) note = data.get("note","Có người tới") # create greeting text greeting = f"Xin chào! {note}" # store CONV.append(("__presence__", greeting)) push_display("RADAR: " + note[:40]) # Telegram notify if TELEGRAM_TOKEN: try: send_telegram_message(f"⚠️ Robot: Phát hiện người - {note}") except Exception: logger.exception("telegram notify failed") # Return greeting so ESP can call /tts to download and play (or include mp3 directly) return jsonify({"greeting": greeting}) @app.route("/display", methods=["GET"]) def api_display(): """ESP32 GET -> returns last display lines to show on OLED.""" return jsonify({"lines": DISPLAY_LINES, "conv_len": len(CONV)}) # ====== Web UI (simple mobile-friendly) ====== INDEX_HTML = """ KC Robot AI V4.0

🤖 KC Robot AI V4.0 — Cloud Brain




Logs

""" @app.route("/", methods=["GET"]) def index(): return render_template_string(INDEX_HTML) # ====== Telegram integration (polling minimal) ====== def send_telegram_message(text: str): if not TELEGRAM_TOKEN: logger.warning("Telegram token not set") return url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage" payload = {"chat_id": os.getenv("TELEGRAM_CHATID", ""), "text": text} try: r = requests.post(url, json=payload, timeout=10) if not r.ok: logger.warning("Telegram send failed: %s %s", r.status_code, r.text) except Exception: logger.exception("send_telegram_message error") def telegram_poll_loop(server_url: str): if not TELEGRAM_TOKEN: logger.info("No TELEGRAM_TOKEN -> telegram disabled") return logger.info("Starting Telegram poller") offset = None base = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}" while True: try: params = {"timeout": 30} if offset: params["offset"] = offset r = requests.get(base + "/getUpdates", params=params, timeout=35) if r.status_code != 200: time.sleep(2); continue j = r.json() for u in j.get("result", []): offset = u["update_id"] + 1 msg = u.get("message") or {} chat = msg.get("chat", {}) chat_id = chat.get("id") text = (msg.get("text") or "").strip() if not text: continue logger.info("TG msg %s: %s", chat_id, text) # commands: /ask , /say, /status if text.lower().startswith("/ask "): q = text[5:].strip() try: ans = hf_text_generate(q) except Exception as e: ans = f"[HF error] {e}" # reply try: requests.post(base + "/sendMessage", json={"chat_id": chat_id, "text": ans}, timeout=10) except Exception: logger.exception("tg reply failed") elif text.lower().startswith("/say "): tts_text = text[5:].strip() # get mp3 and send as audio try: mp3 = hf_tts_get_mp3(tts_text) files = {"audio": ("reply.mp3", mp3, "audio/mpeg")} requests.post(base + "/sendAudio", files=files, data={"chat_id": chat_id}, timeout=30) except Exception: logger.exception("tg say failed") elif text.lower().startswith("/status"): try: requests.post(base + "/sendMessage", json={"chat_id": chat_id, "text": "Robot brain running"}, timeout=10) except Exception: pass else: # default help try: requests.post(base + "/sendMessage", json={"chat_id": chat_id, "text": "Commands: /ask | /say | /status"}, timeout=10) except Exception: pass except Exception: logger.exception("telegram poll loop exception") time.sleep(3) # ====== Background threads startup ====== def start_background(): # Start telegram thread if token exists if TELEGRAM_TOKEN: t = threading.Thread(target=telegram_poll_loop, args=(f"http://127.0.0.1:{PORT}",), daemon=True) t.start() logger.info("Telegram poller started.") else: logger.info("Telegram not configured.") # start background when app runs @app.before_first_request def _startup(): start_background() # ====== run ====== if __name__ == "__main__": start_background() logger.info(f"Starting server on port {PORT}") app.run(host="0.0.0.0", port=PORT)