File size: 17,150 Bytes
6bdc0eb
a195de3
 
6bdc0eb
4766ecc
6bdc0eb
 
 
 
 
7a1d797
a9985aa
7289131
a195de3
 
6bdc0eb
7289131
c8f4ac6
 
7289131
a195de3
6bdc0eb
 
7a1d797
a195de3
 
6bdc0eb
 
 
 
 
 
 
 
 
 
 
7a1d797
d993f05
 
6bdc0eb
c0639e3
6bdc0eb
8cd8b10
6bdc0eb
7289131
a195de3
d993f05
7289131
d993f05
7289131
 
a195de3
d993f05
6bdc0eb
a9985aa
7289131
 
a9985aa
 
7289131
a9985aa
 
6bdc0eb
 
7289131
 
 
 
 
 
 
 
 
 
 
 
6bdc0eb
 
 
 
7289131
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d993f05
6bdc0eb
7289131
 
 
 
 
6bdc0eb
 
7289131
 
 
6bdc0eb
 
 
 
 
 
 
 
 
 
 
7289131
4be75eb
7289131
 
4be75eb
c0639e3
 
 
 
 
7a1d797
c0639e3
 
7289131
 
 
 
 
7a1d797
7289131
 
 
6bdc0eb
 
 
 
 
7289131
 
7a1d797
7289131
 
 
6bdc0eb
c0639e3
6bdc0eb
c0639e3
 
 
 
6bdc0eb
c0639e3
6bdc0eb
 
 
 
 
 
 
 
 
 
 
 
d993f05
7a1d797
d993f05
 
 
 
 
7a1d797
7289131
d993f05
 
 
7289131
d993f05
 
 
 
 
 
725af30
d993f05
 
 
 
 
 
 
 
 
 
 
 
c0639e3
d993f05
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c0639e3
6bdc0eb
7289131
 
 
a195de3
a9985aa
7a1d797
a9985aa
7289131
 
 
 
 
a9985aa
7289131
6bdc0eb
 
c0639e3
 
6bdc0eb
c0639e3
6bdc0eb
7289131
6bdc0eb
a9985aa
 
7289131
a9985aa
6bdc0eb
 
 
7a1d797
c0639e3
 
6bdc0eb
 
 
708d1b5
 
7289131
4be75eb
7289131
 
c0639e3
 
7289131
 
 
 
4be75eb
7289131
 
 
6bdc0eb
 
7a1d797
6bdc0eb
7289131
 
 
 
 
 
 
 
 
6bdc0eb
7289131
 
 
6bdc0eb
7a1d797
 
 
 
 
 
 
7289131
 
 
 
 
 
 
 
 
c0639e3
7289131
 
 
 
6bdc0eb
 
7289131
 
 
 
a195de3
5edd04d
7289131
 
 
5edd04d
7289131
4be75eb
 
6bdc0eb
7289131
a195de3
7289131
c0639e3
d993f05
7289131
 
 
c0639e3
7289131
 
 
 
 
c0639e3
7289131
c0639e3
7289131
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a195de3
88c7a02
4be75eb
 
 
 
a195de3
4be75eb
 
 
 
a195de3
 
4be75eb
a195de3
4be75eb
 
 
 
 
 
b872dab
4be75eb
 
 
a195de3
 
 
 
 
 
 
 
 
 
4be75eb
a195de3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
# ============================================================
# analyzer_agent_gradio/app.py — Telegram Analyzer Agent (Gradio/Scheduler Only)
# Uses Replit Proxy for Telegram API access.
# ============================================================
import huggingface_hub
import os
import json
import asyncio
from datetime import datetime
import logging
import threading
from functools import wraps
from typing import List, Dict, Any, Optional
import time # <<< إضافة مكتبة time للـ loop
import requests # <<< إضافة مكتبة requests للاتصال بالجسر

import gradio as gr
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()

# LLM & minimal telegram dependencies
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
import telegram 
# لا نحتاج لـ telegram.ext هنا لأننا أزلنا الـ Handlers

from llama_cpp import Llama
from huggingface_hub import hf_hub_download


# ---------------- Logging ----------------
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
log = logging.getLogger("analyzer")


# ---------------- Env & config ----------------
TG_BOT_TOKEN = os.getenv("TG_BOT_TOKEN")
TG_CHANNEL = os.getenv("TG_CHANNEL")
# المتغير الجديد: رابط تطبيق الجسر على Replit
REPLIT_PROXY_URL = os.getenv("REPLIT_PROXY_URL") 
LOG_PATH = os.getenv("ANALYZER_LOG", "analyzer_log.json")
POSTS_LIMIT = int(os.getenv("ANALYZER_LIMIT", "80")) 

MAMBA_MODEL_PATH = os.getenv("MAMBA_MODEL_PATH", "state-spaces/mamba-1.4b-hf")

# ---------------- Initialization Check ----------------
if not all([TG_BOT_TOKEN, TG_CHANNEL, REPLIT_PROXY_URL]):
    log.error("Telegram Bot Token, Channel ID, or Replit Proxy URL are missing in environment variables.")
    IS_SERVICE_READY = False
    STATUS_MESSAGE = "❌ النظام غير جاهز: بيانات اعتماد البوت، معرف القناة، أو رابط الجسر مفقودة."
else:
    IS_SERVICE_READY = True
    STATUS_MESSAGE = "✅ النظام جاهز للتحليل (Gradio & Scheduler)."


# ---------------- Helpers for Non-Async Blocking Operations ----------------
def async_wrap_blocking(func):
    """Wraps a synchronous function to be run in a separate thread."""
    @wraps(func)
    async def wrapper(*args, **kwargs):
        return await asyncio.to_thread(func, *args, **kwargs)
    return wrapper

# ---------------- Load Mamba ----------------
log.info("Loading Mamba model...")
try:
    mamba_tok = AutoTokenizer.from_pretrained(MAMBA_MODEL_PATH)
    mamba_model = AutoModelForCausalLM.from_pretrained(
        MAMBA_MODEL_PATH,
        torch_dtype=torch.float16,
        low_cpu_mem_usage=True
    )
except Exception as e:
    log.error(f"Failed to load Mamba model: {e}")
    mamba_tok, mamba_model = None, None
    IS_SERVICE_READY = False
    STATUS_MESSAGE = "❌ فشل تحميل نموذج Mamba."


# ---------------- Load GGUF LLM (Zephyr 7B) ----------------
log.info("Loading LLM interpreter (GGUF + llama.cpp)...")
llm: Optional[Llama] = None

try:
    LLM_GGUF_REPO = "TheBloke/zephyr-7B-beta-GGUF"
    LLM_GGUF_FILE = "zephyr-7b-beta.Q6_K.gguf"
    LLM_LOCAL_PATH = os.getenv("LLM_GGUF_PATH", f"./{LLM_GGUF_FILE}")

    if not os.path.exists(LLM_LOCAL_PATH):
        log.info("Downloading GGUF model from HuggingFace...")
        LLM_LOCAL_PATH = hf_hub_download(
            repo_id=LLM_GGUF_REPO,
            filename=LLM_GGUF_FILE
        )

    llm = Llama(
        model_path=LLM_LOCAL_PATH,
        n_ctx=4096,
        n_threads=4,
        n_gpu_layers=0 
    )
    log.info("GGUF model loaded successfully.")
except Exception as e:
    log.error(f"Failed to load GGUF model: {e}")
    IS_SERVICE_READY = False
    STATUS_MESSAGE = "❌ فشل تحميل نموذج GGUF."


# ---------------- Core Helpers ----------------
def save_log(entry: Dict[str, Any]):
    """Saves the analysis entry to a JSON log file."""
    logs = []
    if os.path.exists(LOG_PATH):
        try:
            with open(LOG_PATH, "r", encoding="utf-8") as f:
                logs = json.load(f)
        except Exception:
            logs = []
    logs.insert(0, entry)
    with open(LOG_PATH, "w", encoding="utf-8") as f:
        json.dump(logs, f, ensure_ascii=False, indent=2)

def encode_stats_for_mamba(posts: List[Dict[str, Any]]) -> str:
    """Encodes the list of posts into a single string for Mamba analysis."""
    lines = []
    for p in posts:
        if p.get('chat_id'): 
            line = (
                f"Channel Name:{p.get('title')} | Members:{p.get('members')} | "
                f"Admins:{p.get('admins_count')}"
            )
            lines.append(line)
            break 
    if not lines:
         return "No sufficient data for sequential analysis. Analyzing overall channel status."
    return "\n".join(lines)


@async_wrap_blocking
def run_mamba(text: str) -> str:
    """Synchronous Mamba generation function."""
    if mamba_model is None or mamba_tok is None:
        return "Error: Mamba model not loaded."
        
    inp = mamba_tok(text, return_tensors="pt")
    with torch.no_grad():
        out = mamba_model.generate(**inp, max_new_tokens=64, do_sample=False)
    return mamba_tok.decode(out[0], skip_special_tokens=True)

@async_wrap_blocking
def interpret_with_llm(mamba_output: str) -> str:
    """Synchronous LLM interpretation function using llama.cpp."""
    if llm is None:
        return "Error: LLM model not loaded."
        
    prompt = (
        "هذه نتائج تحليل إحصائي لقناة تلغرام (معلومات عامة فقط):\n"
        f"{mamba_output}\n\n"
        "حلل أداء القناة الأساسي (عدد الأعضاء والمشرفين).\n"
        "استخرج:\n"
        "- نقاط القوة (مثل عدد الأعضاء)\n"
        "- نقاط الضعف (مثل نقص البيانات الزمنية أو التفاعلات)\n"
        "- استراتيجيات لزيادة الاشتراكات والتفاعل\n"
        "اكتب التحليل بالعربية وبشكل مرتب ومختصر، مع ذكر القيود على البيانات المتاحة."
    )

    res = llm(
        prompt,
        max_tokens=250,
        temperature=0.3,
        top_p=0.95
    )

    return res["choices"][0]["text"].strip()


# ---------------- FETCH TELEGRAM STATS (VIA REPLIT PROXY) ----------------
async def fetch_telegram_stats(limit: int = POSTS_LIMIT) -> List[Dict[str, Any]]:
    """
    Fetches general channel statistics by routing requests through the Replit Proxy.
    """
    
    if not IS_SERVICE_READY or not REPLIT_PROXY_URL:
        raise RuntimeError(STATUS_MESSAGE)

    async def fetch_via_proxy(method: str, data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
        """Sends a request to the Replit proxy and returns the Telegram response result."""
        url = f"{REPLIT_PROXY_URL}/route_telegram/{method}"
        
        try:
            # نستخدم to_thread لجعل طلب requests متزامن (Blocking) يعمل في خيط منفصل
            response = await asyncio.to_thread(
                requests.post,
                url, 
                json=data if data is not None else {}, 
                timeout=200
            )
            response.raise_for_status() 
            
            json_response = response.json()
            if not json_response.get('ok'):
                # يتم إلقاء خطأ Telegram API إذا كان الرد سلبياً
                error_description = json_response.get('description', 'Unknown API Error')
                raise requests.exceptions.HTTPError(
                    f"Telegram API Error via Proxy: {error_description}",
                    response=response
                )
            return json_response['result']
        
        except requests.exceptions.HTTPError as e:
            error_details = str(e)
            log.error(f"Proxy/Telegram HTTP Error during {method}: {error_details}")
            try:
                error_data = e.response.json()
                error_msg = error_data.get("description", error_details)
            except:
                error_msg = error_details
            
            if 'unauthorized' in error_msg.lower() or 'forbidden' in error_msg.lower():
                raise RuntimeError(f"فشل مصادقة Telegram (عبر الجسر): {error_msg}. تأكد من صحة TG_BOT_TOKEN في الجسر.")
            else:
                raise RuntimeError(f"خطأ في جلب البيانات عبر الجسر ({method}): {error_msg}.")
        except Exception as e:
            log.error(f"An unexpected error occurred during proxy fetch: {e}")
            raise RuntimeError(f"خطأ غير متوقع في الاتصال بالجسر: {e}.")


    # --- 1. جلب معلومات القناة الأساسية (getChat) ---
    chat_info_data = await fetch_via_proxy(
        "getChat", 
        data={"chat_id": TG_CHANNEL}
    )

    # --- 2. جلب المشرفين (getChatAdministrators) ---
    admins_list = await fetch_via_proxy(
        "getChatAdministrators", 
        data={"chat_id": TG_CHANNEL}
    )
    
    admins_count = len(admins_list)
    
    # تحويل البيانات إلى تنسيق موحد للتحليل
    stats = {
        "chat_id": chat_info_data.get('id'),
        "title": chat_info_data.get('title'),
        "members": chat_info_data.get('members_count', 'N/A'), 
        "admins_count": admins_count,
        "description": chat_info_data.get('description'),
        "date": datetime.utcnow().isoformat(),
    }
    
    log.info(f"Successfully fetched stats (via proxy) for channel {stats['title']} with {stats['members']} members.")
    return [stats]


# ---------------- Main Analysis Pipeline ----------------
async def run_analysis_pipeline(limit: int = POSTS_LIMIT) -> Dict[str, Any]:
    """Runs the full analysis pipeline."""
    log.info("Running analysis job...")
    
    # 1. جلب البيانات
    try:
        posts = await fetch_telegram_stats(limit=limit)
    except RuntimeError as e:
        log.warning(f"Failed to fetch stats: {e}")
        error_detail = str(e)
        entry = {"time": datetime.utcnow().isoformat(), "error": f"fetch_error: {error_detail}"}
        save_log(entry)
        return {"status": "error", "message": error_detail, "log_entry": entry}

    if not posts:
        log.warning("No channel stats found for analysis.")
        entry = {"time": datetime.utcnow().isoformat(), "error": "no_stats"}
        save_log(entry)
        return {"status": "warning", "message": "لم يتم العثور على إحصائيات للقناة.", "log_entry": entry}

    # 2. تشغيل Mamba
    stats_text = encode_stats_for_mamba(posts)
    mamba_out = await run_mamba(stats_text)
    
    # 3. تفسير LLM
    interpretation = await interpret_with_llm(mamba_out)

    entry = {
        "time": datetime.utcnow().isoformat(),
            "posts_count": 1, 
            "channel_title": posts[0].get('title'),
            "channel_members": posts[0].get('members'),
            "advice": interpretation
        }

    save_log(entry)
    log.info("Analysis saved.")
    
    # تنسيق الخرج 
    output_message = (
        f"**✅ اكتمل التحليل بنجاح!**\n\n"
        f"**القناة:** {posts[0].get('title')}\n"
        f"**عدد الأعضاء:** {posts[0].get('members')}\n"
        f"**وقت التحليل:** {entry['time']}\n\n"
        f"--- **توصيات الذكاء الاصطناعي** ---\n"
        f"{interpretation}\n\n"
        f"--- **إخراج Mamba الخام** ---\n"
        f"`{mamba_out.strip()}`"
    )
    
    return {"status": "success", "message": output_message, "log_entry": entry}


# ---------------- Gradio Interface Functions ----------------

async def gradio_run_once(limit: int) -> str:
    """Gradio function to run the analysis manually."""
    
    result = await run_analysis_pipeline(limit=limit)
    
    if result.get("status") == "success":
        return result["message"]
    else:
        return f"**⚠️ فشل التحليل:** {result['message']}"

def gradio_get_logs() -> str:
    """Gradio function to display logs."""
    logs = []
    if os.path.exists(LOG_PATH):
        try:
            with open(LOG_PATH, "r", encoding="utf-8") as f:
                logs = json.load(f)
        except Exception as e:
            log.error(f"Error reading log file: {e}")
            return "حدث خطأ أثناء قراءة ملف السجلات."

    if not logs:
        return "لا توجد سجلات تحليل متاحة."
        
    output = "## 📄 سجلات التحليل الأخيرة\n"
    for i, entry in enumerate(logs):
        output += f"### {i+1}. تحليل بتاريخ {entry.get('time', 'N/A')}\n"
        if entry.get('error'):
            output += f"**❌ خطأ:** {entry['error']}\n"
        else:
            output += f"**✅ القناة:** {entry.get('channel_title', 'N/A')}\n"
            output += f"**💡 النصيحة:**\n```\n{entry.get('advice', 'N/A')[:300]}...\n```\n"
        output += "---\n"
        
    return output

# ---------------- Scheduler ----------------
def daily_job_wrapper():
    """Synchronous wrapper to run the async job in the scheduler."""
    log.info("Running scheduled analysis job via Gradio wrapper...")
    try:
        # تشغيل الدالة async في حلقة الحدث الخاصة بها
        result = asyncio.run(run_analysis_pipeline())
        log.info(f"Scheduled job completed. Status: {result.get('status')}")
    except Exception as e:
        log.error(f"Error in scheduled job: {e}")


# ---------------- Gradio Interface Definition ----------------
# تم وضع التعريف هنا لاستخدامه لاحقاً في خيط منفصل

with gr.Blocks(title="Telegram Channel Analyzer Agent") as demo:
    gr.Markdown("# 🤖 وكيل تحليل قناة Telegram (Gradio & Scheduler)")
    gr.Markdown(f"**حالة الخدمة:** {STATUS_MESSAGE}")
    gr.Markdown(f"**القناة المستهدفة:** `{TG_CHANNEL}`")
    gr.Markdown(f"**جسر Replit:** `{REPLIT_PROXY_URL}`")
    gr.Markdown("---")
    
    with gr.Tab("تشغيل التحليل يدوياً"):
        gr.Markdown("## ⚙️ تشغيل لمرة واحدة - جلب إحصائيات القناة الأساسية")
        limit_input = gr.Slider(
            minimum=10, 
            maximum=300, 
            step=10, 
            value=POSTS_LIMIT, 
            label="الحد الأقصى للمنشورات (غير مستخدم مع Bot API)"
        )
        run_button = gr.Button("🚀 بدء تحليل معلومات القناة الآن")
        
        output_textbox = gr.Markdown(label="نتيجة التحليل")
        
        run_button.click(
            fn=gradio_run_once, 
            inputs=[limit_input], 
            outputs=[output_textbox]
        )
        
    with gr.Tab("سجلات التحليل"):
        gr.Markdown("## 📋 سجلات التحليل المحفوظة")
        log_button = gr.Button("🔄 تحديث السجلات")
        log_output = gr.Markdown(label="السجلات")
        
        log_button.click(
            fn=gradio_get_logs, 
            inputs=[], 
            outputs=[log_output]
        )
        demo.load(
            fn=gradio_get_logs, 
            inputs=[], 
            outputs=[log_output]
        )

# ---------------- Main Entry Point (Gradio/Scheduler) ----------------
if __name__ == "__main__":
    if not IS_SERVICE_READY:
        log.error("Service is not ready. Exiting.")
        exit(1)
        
    # <<< تم حذف كل ما يتعلق بـ Application و Handlers (Webhook/Polling) >>>

    # 1. تشغيل Gradio في خيط منفصل
    def start_gradio():
        log.info("Starting Gradio Interface in a separate thread...")
        # استخدام المنفذ الذي يحدده المضيف (عادةً Render)
        PORT = int(os.environ.get("PORT", "7860")) 
        try:
            demo.launch(server_name="0.0.0.0", server_port=PORT) 
        except Exception as e:
            log.error(f"Failed to start Gradio: {e}")

    threading.Thread(target=start_gradio, daemon=True).start()
    
    # 2. بدء المجدول للمهمة الدورية
    scheduler.add_job(daily_job_wrapper, "cron", hour=21, minute=10) 
    scheduler.start()
    log.info("Scheduler started for daily analysis.")

    # 3. إبقاء الخيط الرئيسي حياً لضمان استمرار عمل المجدول و Gradio
    try:
        log.info("Keeping main thread alive for Gradio and Scheduler...")
        # حلقة بسيطة لإبقاء البرنامج قيد التشغيل (بديل لـ run_polling)
        while True:
            time.sleep(1)
    except (KeyboardInterrupt, SystemExit):
        log.info("Exiting gracefully.")
    except Exception as e:
        log.error(f"Error in main loop: {e}")

    log.info("Analyzer Agent stopped.")