AnalizerAgent / app.py
Mustafa-albakkar's picture
Update app.py
d993f05 verified
raw
history blame
20.3 kB
# ============================================================
# analyzer_agent_gradio/app.py — Telegram Analyzer Agent (Via Replit Proxy)
# Mamba + GGUF LLM + python-telegram-bot + Gradio
# ============================================================
import os
import json
import asyncio
from datetime import datetime
import logging
import threading
from functools import wraps
from typing import List, Dict, Any, Optional
import gradio as gr
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
# LLM & telegram dependencies
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
import telegram
from telegram.error import TelegramError, BadRequest
# استيرادات Webhook/Handlers
from telegram.ext import Application, CommandHandler, MessageHandler, filters, CallbackContext
from llama_cpp import Llama
from huggingface_hub import hf_hub_download
import requests # <<< إضافة هامة للاتصال بالجسر
# ---------------- Logging ----------------
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
log = logging.getLogger("analyzer")
# ---------------- Env & config ----------------
TG_BOT_TOKEN = os.getenv("TG_BOT_TOKEN")
TG_CHANNEL = os.getenv("TG_CHANNEL")
# المتغير الجديد: رابط تطبيق الجسر على Replit
REPLIT_PROXY_URL = os.getenv("REPLIT_PROXY_URL")
LOG_PATH = os.getenv("ANALYZER_LOG", "analyzer_log.json")
POSTS_LIMIT = int(os.getenv("ANALYZER_LIMIT", "80"))
MAMBA_MODEL_PATH = os.getenv("MAMBA_MODEL_PATH", "state-spaces/mamba-1.4b-hf")
# ---------------- Webhook Config ----------------
WEBHOOK_URL = os.getenv("WEBHOOK_URL")
WEBHOOK_PORT = int(os.getenv("WEBHOOK_PORT", "8443"))
LISTEN_ADDRESS = os.getenv("LISTEN_ADDRESS", "0.0.0.0")
# ---------------- Initialization Check ----------------
if not all([TG_BOT_TOKEN, TG_CHANNEL, REPLIT_PROXY_URL]): # <<< تعديل التحقق
log.error("Telegram Bot Token, Channel ID, or Replit Proxy URL are missing in environment variables.")
IS_SERVICE_READY = False
STATUS_MESSAGE = "❌ النظام غير جاهز: بيانات اعتماد البوت، معرف القناة، أو رابط الجسر مفقودة."
else:
IS_SERVICE_READY = True
STATUS_MESSAGE = "✅ النظام جاهز للتحليل (Bot API via Replit Proxy)."
# ---------------- Helpers for Non-Async Blocking Operations ----------------
def async_wrap_blocking(func):
"""Wraps a synchronous function to be run in a separate thread."""
@wraps(func)
async def wrapper(*args, **kwargs):
return await asyncio.to_thread(func, *args, **kwargs)
return wrapper
# ---------------- Load Mamba ----------------
log.info("Loading Mamba model...")
try:
mamba_tok = AutoTokenizer.from_pretrained(MAMBA_MODEL_PATH)
mamba_model = AutoModelForCausalLM.from_pretrained(
MAMBA_MODEL_PATH,
torch_dtype=torch.float16,
low_cpu_mem_usage=True
)
except Exception as e:
log.error(f"Failed to load Mamba model: {e}")
mamba_tok, mamba_model = None, None
IS_SERVICE_READY = False
STATUS_MESSAGE = "❌ فشل تحميل نموذج Mamba."
# ---------------- Load GGUF LLM (Zephyr 7B) ----------------
log.info("Loading LLM interpreter (GGUF + llama.cpp)...")
llm: Optional[Llama] = None
try:
LLM_GGUF_REPO = "TheBloke/zephyr-7B-beta-GGUF"
LLM_GGUF_FILE = "zephyr-7b-beta.Q6_K.gguf"
LLM_LOCAL_PATH = os.getenv("LLM_GGUF_PATH", f"./{LLM_GGUF_FILE}")
if not os.path.exists(LLM_LOCAL_PATH):
log.info("Downloading GGUF model from HuggingFace...")
LLM_LOCAL_PATH = hf_hub_download(
repo_id=LLM_GGUF_REPO,
filename=LLM_GGUF_FILE
)
llm = Llama(
model_path=LLM_LOCAL_PATH,
n_ctx=4096,
n_threads=4,
n_gpu_layers=0
)
log.info("GGUF model loaded successfully.")
except Exception as e:
log.error(f"Failed to load GGUF model: {e}")
IS_SERVICE_READY = False
STATUS_MESSAGE = "❌ فشل تحميل نموذج GGUF."
# ---------------- Telegram Bot Client (for Webhook handlers) ----------------
# لا يزال هذا الكائن ضرورياً لـ python-telegram-bot لاستقبال الأوامر والرد
if IS_SERVICE_READY:
try:
# لا نحتاج لـ bot_client لكن نحتاج للتحقق من التوكن لـ Application.builder
pass
except Exception as e:
log.error(f"Failed to initialize Telegram Bot: {e}")
IS_SERVICE_READY = False
STATUS_MESSAGE = "❌ فشل تهيئة كائن البوت."
# ---------------- Core Helpers ----------------
def save_log(entry: Dict[str, Any]):
"""Saves the analysis entry to a JSON log file."""
logs = []
if os.path.exists(LOG_PATH):
try:
with open(LOG_PATH, "r", encoding="utf-8") as f:
logs = json.load(f)
except Exception:
logs = []
logs.insert(0, entry)
with open(LOG_PATH, "w", encoding="utf-8") as f:
json.dump(logs, f, ensure_ascii=False, indent=2)
def encode_stats_for_mamba(posts: List[Dict[str, Any]]) -> str:
"""Encodes the list of posts into a single string for Mamba analysis."""
lines = []
for p in posts:
if p.get('chat_id'):
line = (
f"Channel Name:{p.get('title')} | Members:{p.get('members')} | "
f"Admins:{p.get('admins_count')}"
)
lines.append(line)
break
if not lines:
return "No sufficient data for sequential analysis. Analyzing overall channel status."
return "\n".join(lines)
@async_wrap_blocking
def run_mamba(text: str) -> str:
"""Synchronous Mamba generation function."""
if mamba_model is None or mamba_tok is None:
return "Error: Mamba model not loaded."
inp = mamba_tok(text, return_tensors="pt")
with torch.no_grad():
out = mamba_model.generate(**inp, max_new_tokens=64, do_sample=False)
return mamba_tok.decode(out[0], skip_special_tokens=True)
@async_wrap_blocking
def interpret_with_llm(mamba_output: str) -> str:
"""Synchronous LLM interpretation function using llama.cpp."""
if llm is None:
return "Error: LLM model not loaded."
prompt = (
"هذه نتائج تحليل إحصائي لقناة تلغرام (معلومات عامة فقط):\n"
f"{mamba_output}\n\n"
"حلل أداء القناة الأساسي (عدد الأعضاء والمشرفين).\n"
"استخرج:\n"
"- نقاط القوة (مثل عدد الأعضاء)\n"
"- نقاط الضعف (مثل نقص البيانات الزمنية أو التفاعلات)\n"
"- استراتيجيات لزيادة الاشتراكات والتفاعل\n"
"اكتب التحليل بالعربية وبشكل مرتب ومختصر، مع ذكر القيود على البيانات المتاحة."
)
res = llm(
prompt,
max_tokens=250,
temperature=0.3,
top_p=0.95
)
return res["choices"][0]["text"].strip()
# ---------------- FETCH TELEGRAM STATS (VIA REPLIT PROXY) ----------------
async def fetch_telegram_stats(limit: int = POSTS_LIMIT) -> List[Dict[str, Any]]:
"""
Fetches general channel statistics by routing requests through the Replit Proxy.
"""
if not IS_SERVICE_READY or not REPLIT_PROXY_URL:
raise RuntimeError(STATUS_MESSAGE)
async def fetch_via_proxy(method: str, data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Sends a request to the Replit proxy and returns the Telegram response result."""
url = f"{REPLIT_PROXY_URL}/route_telegram/{method}"
try:
# نستخدم to_thread لجعل طلب requests متزامن (Blocking) يعمل في خيط منفصل
response = await asyncio.to_thread(
requests.post,
url,
json=data if data is not None else {},
timeout=30
)
response.raise_for_status()
json_response = response.json()
if not json_response.get('ok'):
# يتم إلقاء خطأ Telegram API إذا كان الرد سلبياً
error_description = json_response.get('description', 'Unknown API Error')
raise requests.exceptions.HTTPError(
f"Telegram API Error via Proxy: {error_description}",
response=response
)
return json_response['result']
except requests.exceptions.HTTPError as e:
error_details = str(e)
log.error(f"Proxy/Telegram HTTP Error during {method}: {error_details}")
try:
error_data = e.response.json()
error_msg = error_data.get("description", error_details)
except:
error_msg = error_details
if 'unauthorized' in error_msg.lower() or 'forbidden' in error_msg.lower():
raise RuntimeError(f"فشل مصادقة Telegram (عبر الجسر): {error_msg}. تأكد من صحة TG_BOT_TOKEN في الجسر.")
else:
raise RuntimeError(f"خطأ في جلب البيانات عبر الجسر ({method}): {error_msg}.")
except Exception as e:
log.error(f"An unexpected error occurred during proxy fetch: {e}")
raise RuntimeError(f"خطأ غير متوقع في الاتصال بالجسر: {e}.")
# --- 1. جلب معلومات القناة الأساسية (getChat) ---
chat_info_data = await fetch_via_proxy(
"getChat",
data={"chat_id": TG_CHANNEL}
)
# --- 2. جلب المشرفين (getChatAdministrators) ---
admins_list = await fetch_via_proxy(
"getChatAdministrators",
data={"chat_id": TG_CHANNEL}
)
admins_count = len(admins_list)
# تحويل البيانات إلى تنسيق موحد للتحليل
stats = {
"chat_id": chat_info_data.get('id'),
"title": chat_info_data.get('title'),
"members": chat_info_data.get('members_count', 'N/A'),
"admins_count": admins_count,
"description": chat_info_data.get('description'),
"date": datetime.utcnow().isoformat(),
}
log.info(f"Successfully fetched stats (via proxy) for channel {stats['title']} with {stats['members']} members.")
return [stats]
# ---------------- Main Analysis Pipeline ----------------
async def run_analysis_pipeline(limit: int = POSTS_LIMIT) -> Dict[str, Any]:
"""Runs the full analysis pipeline."""
log.info("Running manual analysis job...")
# 1. جلب البيانات
try:
posts = await fetch_telegram_stats(limit=limit)
except RuntimeError as e:
log.warning(f"Failed to fetch stats: {e}")
error_detail = str(e)
entry = {"time": datetime.utcnow().isoformat(), "error": f"fetch_error: {error_detail}"}
save_log(entry)
return {"status": "error", "message": error_detail, "log_entry": entry}
if not posts:
log.warning("No channel stats found for analysis.")
entry = {"time": datetime.utcnow().isoformat(), "error": "no_stats"}
save_log(entry)
return {"status": "warning", "message": "لم يتم العثور على إحصائيات للقناة.", "log_entry": entry}
# 2. تشغيل Mamba
stats_text = encode_stats_for_mamba(posts)
mamba_out = await run_mamba(stats_text)
# 3. تفسير LLM
interpretation = await interpret_with_llm(mamba_out)
entry = {
"time": datetime.utcnow().isoformat(),
"posts_count": 1,
"channel_title": posts[0].get('title'),
"channel_members": posts[0].get('members'),
"advice": interpretation
}
save_log(entry)
log.info("Analysis saved.")
# تنسيق الخرج
output_message = (
f"**✅ اكتمل التحليل بنجاح!**\n\n"
f"**القناة:** {posts[0].get('title')}\n"
f"**عدد الأعضاء:** {posts[0].get('members')}\n"
f"**وقت التحليل:** {entry['time']}\n\n"
f"--- **توصيات الذكاء الاصطناعي** ---\n"
f"{interpretation}\n\n"
f"--- **إخراج Mamba الخام** ---\n"
f"`{mamba_out.strip()}`"
)
return {"status": "success", "message": output_message, "log_entry": entry}
# ---------------- Telegram Webhook Handlers ----------------
async def start_command(update: telegram.Update, context: CallbackContext) -> None:
"""Sends a greeting message."""
if update.effective_chat:
await update.message.reply_text('مرحباً! أنا وكيل التحليل الآلي. أرسل الأمر /analyze لبدء تحليل قناة: '
f'`{TG_CHANNEL}`')
async def analyze_command(update: telegram.Update, context: CallbackContext) -> None:
"""Runs the analysis pipeline when the /analyze command is received."""
if update.effective_chat:
chat_id = update.effective_chat.id
await context.bot.send_message(chat_id=chat_id, text="**⏳ بدء التحليل...** جلب البيانات من Telegram وتشغيل نماذج الذكاء الاصطناعي.",
parse_mode=telegram.constants.ParseMode.MARKDOWN)
# تشغيل خط أنابيب التحليل
result = await run_analysis_pipeline(limit=POSTS_LIMIT)
# إرسال النتيجة
response_message = result.get("message", "فشل التحليل بسبب خطأ غير معروف.")
await context.bot.send_message(chat_id=chat_id, text=response_message,
parse_mode=telegram.constants.ParseMode.MARKDOWN)
async def error_handler(update: object, context: CallbackContext) -> None:
"""Log the error and send a message to the user."""
log.error(f"Update {update} caused error {context.error}")
if isinstance(update, telegram.Update) and update.effective_message:
await update.effective_message.reply_text(f"حدث خطأ في معالجة طلبك: {context.error}")
# ---------------- Gradio Interface Functions ----------------
async def gradio_run_once(limit: int) -> str:
"""Gradio function to run the analysis manually."""
result = await run_analysis_pipeline(limit=limit)
if result.get("status") == "success":
return result["message"]
else:
return f"**⚠️ فشل التحليل:** {result['message']}"
def gradio_get_logs() -> str:
"""Gradio function to display logs."""
logs = []
if os.path.exists(LOG_PATH):
try:
with open(LOG_PATH, "r", encoding="utf-8") as f:
logs = json.load(f)
except Exception as e:
log.error(f"Error reading log file: {e}")
return "حدث خطأ أثناء قراءة ملف السجلات."
if not logs:
return "لا توجد سجلات تحليل متاحة."
output = "## 📄 سجلات التحليل الأخيرة\n"
for i, entry in enumerate(logs):
output += f"### {i+1}. تحليل بتاريخ {entry.get('time', 'N/A')}\n"
if entry.get('error'):
output += f"**❌ خطأ:** {entry['error']}\n"
else:
output += f"**✅ القناة:** {entry.get('channel_title', 'N/A')}\n"
output += f"**💡 النصيحة:**\n```\n{entry.get('advice', 'N/A')[:300]}...\n```\n"
output += "---\n"
return output
# ---------------- Scheduler ----------------
def daily_job_wrapper():
"""Synchronous wrapper to run the async job in the scheduler."""
log.info("Running scheduled analysis job via Gradio wrapper...")
try:
result = asyncio.run(run_analysis_pipeline())
log.info(f"Scheduled job completed. Status: {result.get('status')}")
except Exception as e:
log.error(f"Error in scheduled job: {e}")
# ---------------- Gradio Interface Definition ----------------
# تم وضع التعريف هنا لاستخدامه لاحقاً في خيط منفصل
with gr.Blocks(title="Telegram Channel Analyzer Agent") as demo:
gr.Markdown("# 🤖 وكيل تحليل قناة Telegram (Webhook/Scheduled)")
gr.Markdown(f"**حالة الخدمة:** {STATUS_MESSAGE}")
gr.Markdown(f"**القناة المستهدفة:** `{TG_CHANNEL}`")
gr.Markdown(f"**جسر Replit:** `{REPLIT_PROXY_URL}`")
gr.Markdown("---")
with gr.Tab("تشغيل التحليل يدوياً"):
gr.Markdown("## ⚙️ تشغيل لمرة واحدة - جلب إحصائيات القناة الأساسية")
limit_input = gr.Slider(
minimum=10,
maximum=300,
step=10,
value=POSTS_LIMIT,
label="الحد الأقصى للمنشورات (غير مستخدم مع Bot API)"
)
run_button = gr.Button("🚀 بدء تحليل معلومات القناة الآن")
output_textbox = gr.Markdown(label="نتيجة التحليل")
run_button.click(
fn=gradio_run_once,
inputs=[limit_input],
outputs=[output_textbox]
)
with gr.Tab("سجلات التحليل"):
gr.Markdown("## 📋 سجلات التحليل المحفوظة")
log_button = gr.Button("🔄 تحديث السجلات")
log_output = gr.Markdown(label="السجلات")
log_button.click(
fn=gradio_get_logs,
inputs=[],
outputs=[log_output]
)
demo.load(
fn=gradio_get_logs,
inputs=[],
outputs=[log_output]
)
# ---------------- Main Entry Point (Webhook/Polling) ----------------
if __name__ == "__main__":
if not IS_SERVICE_READY:
log.error("Service is not ready. Exiting.")
exit(1)
# إنشاء كائن التطبيق (Application)
application = Application.builder().token(TG_BOT_TOKEN).build()
# إضافة المعالجات (Handlers)
application.add_handler(CommandHandler("start", start_command))
application.add_handler(CommandHandler("analyze", analyze_command))
application.add_error_handler(error_handler)
# 1. تشغيل Gradio في خيط منفصل
def start_gradio():
log.info("Starting Gradio Interface in a separate thread...")
PORT_GRADIO = int(os.getenv("PORT_GRADIO", "7860"))
try:
demo.launch(server_name="0.0.0.0", server_port=PORT_GRADIO)
except Exception as e:
log.error(f"Failed to start Gradio: {e}")
threading.Thread(target=start_gradio, daemon=True).start()
# 2. بدء المجدول للمهمة الدورية
scheduler.add_job(daily_job_wrapper, "cron", hour=18, minute=15)
scheduler.start()
log.info("Scheduler started for daily analysis.")
# 3. تشغيل Webhook أو Polling حسب التكوين
if WEBHOOK_URL:
# وضع Webhook
log.info(f"Setting Webhook URL to: {WEBHOOK_URL}")
try:
application.bot.set_webhook(url=WEBHOOK_URL)
log.info(f"Starting Webhook server on {LISTEN_ADDRESS}:{WEBHOOK_PORT}...")
url_path = WEBHOOK_URL.split('/')[-1] if WEBHOOK_URL.count('/') > 2 else ""
application.run_webhook(
listen=LISTEN_ADDRESS,
port=WEBHOOK_PORT,
url_path=url_path,
)
except Exception as e:
log.error(f"Failed to start Webhook: {e}")
log.warning("Falling back to Polling mode.")
application.run_polling(poll_interval=1.0)
else:
# وضع Polling (للاختبار المحلي)
log.warning("WEBHOOK_URL is missing. Starting in Polling mode for local/testing purposes.")
application.run_polling(poll_interval=1.0)