Spaces:
				
			
			
	
			
			
		Runtime error
		
	
	
	
			
			
	
	
	
	
		
		
		Runtime error
		
	| from flask import Flask, request, jsonify, render_template, flash, redirect, url_for, current_app | |
| from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user | |
| from werkzeug.security import generate_password_hash, check_password_hash | |
| from transformers import pipeline | |
| from flask import session | |
| import torch | |
| from pydub import AudioSegment | |
| import os | |
| import io | |
| import uuid | |
| from datetime import datetime | |
| import sqlite3 | |
| from pathlib import Path | |
| import whisper | |
| from extensions import db, login_manager | |
| import json | |
| from admin import admin_bp | |
| from flask_migrate import Migrate | |
| from models import User | |
| from werkzeug.utils import secure_filename | |
| from forms import EditProfileForm | |
| instance_path = Path(__file__).parent / 'instance' | |
| instance_path.mkdir(exist_ok=True, mode=0o755) | |
| app = Flask(__name__) | |
| app.secret_key = 'очень_сложный_секретный_ключ_здесь' | |
| db_path = instance_path / 'chats.db' | |
| app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{db_path}' | |
| app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False | |
| # Инициализация Flask-Login | |
| db.init_app(app) | |
| login_manager.init_app(app) | |
| login_manager.login_view = 'welcome' | |
| migrate = Migrate(app, db) | |
| # Инициализация моделей | |
| def init_models(): | |
| try: | |
| emotion_map = { | |
| 'joy': '😊 Радость', | |
| 'neutral': '😐 Нейтрально', | |
| 'anger': '😠 Злость', | |
| 'sadness': '😢 Грусть', | |
| 'surprise': '😲 Удивление' | |
| } | |
| speech_to_text_model = whisper.load_model("base") | |
| text_classifier = pipeline( | |
| "text-classification", | |
| model="cointegrated/rubert-tiny2-cedr-emotion-detection" | |
| ) | |
| audio_classifier = pipeline( | |
| "audio-classification", | |
| model="superb/hubert-large-superb-er" | |
| ) | |
| return { | |
| 'emotion_map': emotion_map, | |
| 'speech_to_text_model': speech_to_text_model, | |
| 'text_classifier': text_classifier, | |
| 'audio_classifier': audio_classifier | |
| } | |
| except Exception as e: | |
| print(f"Ошибка загрузки моделей: {e}") | |
| return None | |
| models = init_models() | |
| if not models: | |
| raise RuntimeError("Не удалось загрузить модели") | |
| def datetimeformat(value, format='%d.%m.%Y %H:%M'): | |
| if value is None: | |
| return "" | |
| return value.strftime(format) | |
| def utility_processor(): | |
| return { | |
| 'emotion_map': { | |
| 'joy': '😊 Радость', | |
| 'neutral': '😐 Нейтрально', | |
| 'anger': '😠 Злость', | |
| 'sadness': '😢 Грусть', | |
| 'surprise': '😲 Удивление' | |
| }, | |
| 'get_emotion_color': lambda emotion: { | |
| 'joy': '#00b894', | |
| 'neutral': '#636e72', | |
| 'anger': '#d63031', | |
| 'sadness': '#0984e3', | |
| 'surprise': '#fdcb6e' | |
| }.get(emotion, '#4a4ae8') | |
| } | |
| # Импорт Blueprint | |
| from auth import auth_bp | |
| from profile import profile_bp | |
| app.register_blueprint(auth_bp) | |
| app.register_blueprint(profile_bp) | |
| app.register_blueprint(admin_bp, url_prefix='/admin') | |
| # Делаем переменные доступными | |
| emotion_map = models['emotion_map'] | |
| speech_to_text_model = models['speech_to_text_model'] | |
| text_classifier = models['text_classifier'] | |
| audio_classifier = models['audio_classifier'] | |
| def create_admin(): | |
| """Создание администратора""" | |
| email = input("Введите email: ") | |
| password = input("Введите пароль: ") | |
| user = User.query.filter_by(email=email).first() | |
| if user: | |
| user.is_admin = True | |
| user.set_password(password) | |
| else: | |
| user = User(email=email, username=email, is_admin=True) | |
| user.set_password(password) | |
| db.session.add(user) | |
| db.session.commit() | |
| print(f"Администратор {email} создан") | |
| def transcribe_audio(audio_path): | |
| """Преобразование аудио в текст с помощью Whisper""" | |
| if not speech_to_text_model: | |
| return None | |
| try: | |
| result = speech_to_text_model.transcribe(audio_path, language="ru") | |
| return result["text"] | |
| except Exception as e: | |
| print(f"Ошибка преобразования аудио в текст: {e}") | |
| return None | |
| # Инициализация Flask-Login | |
| login_manager = LoginManager(app) | |
| login_manager.login_view = 'auth_bp.login' | |
| login_manager.login_message = "Для доступа к этой странице необходимо авторизоваться" | |
| login_manager.login_message_category = "info" | |
| def load_user(user_id): | |
| from models import User | |
| return User.query.get(int(user_id)) | |
| # Инициализация БД | |
| def get_db_connection(): | |
| instance_path = Path('instance') | |
| instance_path.mkdir(exist_ok=True) | |
| db_path = instance_path / 'chats.db' | |
| conn = sqlite3.connect(str(db_path)) | |
| conn.row_factory = sqlite3.Row | |
| return conn | |
| def init_db(): | |
| conn = get_db_connection() | |
| try: | |
| conn.execute(''' | |
| CREATE TABLE IF NOT EXISTS users ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| username TEXT UNIQUE NOT NULL, | |
| email TEXT UNIQUE NOT NULL, | |
| password_hash TEXT NOT NULL, | |
| created_at TEXT DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| ''') | |
| conn.execute(''' | |
| CREATE TABLE IF NOT EXISTS chats ( | |
| chat_id TEXT PRIMARY KEY, | |
| user_id INTEGER, | |
| created_at TEXT, | |
| title TEXT, | |
| FOREIGN KEY(user_id) REFERENCES users(id) | |
| ) | |
| ''') | |
| conn.execute(''' | |
| CREATE TABLE IF NOT EXISTS messages ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| chat_id TEXT, | |
| sender TEXT, | |
| content TEXT, | |
| timestamp TEXT, | |
| FOREIGN KEY(chat_id) REFERENCES chats(chat_id) | |
| ) | |
| ''') | |
| conn.execute(''' | |
| CREATE TABLE IF NOT EXISTS analysis_reports ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| user_id INTEGER, | |
| content TEXT, | |
| emotion TEXT, | |
| confidence REAL, | |
| created_at TEXT DEFAULT CURRENT_TIMESTAMP, | |
| FOREIGN KEY(user_id) REFERENCES users(id) | |
| ) | |
| ''') | |
| conn.commit() | |
| finally: | |
| conn.close() | |
| init_db() | |
| # Маршруты аутентификации | |
| def login(): | |
| if request.method == 'POST': | |
| email = request.form.get('email') | |
| password = request.form.get('password') | |
| conn = get_db_connection() | |
| user = conn.execute( | |
| "SELECT id, username, email, password_hash FROM users WHERE email = ?", | |
| (email,) | |
| ).fetchone() | |
| conn.close() | |
| if user and check_password_hash(user['password_hash'], password): | |
| user_obj = User(id=user['id'], username=user['username'], | |
| email=user['email'], password_hash=user['password_hash']) | |
| login_user(user_obj) | |
| session.pop('_flashes', None) | |
| return redirect(url_for('index')) | |
| else: | |
| flash('Неверный email или пароль', 'danger') # <-- теперь здесь | |
| return render_template('auth/login.html') | |
| def register(): | |
| if request.method == 'POST': | |
| username = request.form.get('username') | |
| email = request.form.get('email') | |
| password = request.form.get('password') | |
| confirm_password = request.form.get('confirm_password') | |
| if password != confirm_password: | |
| flash('Пароли не совпадают', 'danger') | |
| return redirect(url_for('register')) | |
| conn = get_db_connection() | |
| try: | |
| password_hash = generate_password_hash(password) | |
| conn.execute( | |
| "INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)", | |
| (username, email, password_hash) | |
| ) | |
| conn.commit() | |
| flash('Регистрация прошла успешно! Теперь вы можете войти.', 'success') | |
| return redirect(url_for('login')) | |
| except sqlite3.IntegrityError: | |
| flash('Пользователь с таким email или именем уже существует', 'danger') | |
| finally: | |
| conn.close() | |
| return render_template('auth/register.html') | |
| from werkzeug.security import check_password_hash, generate_password_hash | |
| def edit_profile(): | |
| form = EditProfileForm(obj=current_user) | |
| if form.validate_on_submit(): | |
| # Обновляем имя и почту | |
| current_user.username = form.username.data | |
| current_user.email = form.email.data | |
| # Обновляем аватар | |
| if form.avatar.data: | |
| filename = secure_filename(form.avatar.data.filename) | |
| unique_filename = f"{uuid.uuid4().hex}_{filename}" | |
| avatar_path = os.path.join(current_app.root_path, 'static/avatars', unique_filename) | |
| form.avatar.data.save(avatar_path) | |
| current_user.avatar = unique_filename | |
| # Обработка смены пароля | |
| if form.current_password.data: | |
| # Проверяем текущий пароль | |
| if check_password_hash(current_user.password_hash, form.current_password.data): | |
| # Меняем пароль на новый | |
| current_user.password_hash = generate_password_hash(form.new_password.data) | |
| flash('Пароль успешно изменён', 'success') | |
| else: | |
| flash('Текущий пароль неверный', 'danger') | |
| return redirect(url_for('edit_profile')) | |
| db.session.commit() | |
| flash('Профиль обновлён', 'success') | |
| return redirect(url_for('profile')) | |
| return render_template('edit_profile.html', form=form) | |
| # Основные маршруты | |
| def welcome(): | |
| return render_template("welcome.html") | |
| def logout(): | |
| session.clear() | |
| logout_user() | |
| return redirect(url_for('welcome')) | |
| def index(): | |
| if current_user.is_authenticated: | |
| conn = get_db_connection() | |
| try: | |
| chats = conn.execute( | |
| "SELECT chat_id, title FROM chats WHERE user_id = ? ORDER BY created_at DESC", | |
| (current_user.id,) | |
| ).fetchall() | |
| return render_template("index.html", chats=chats) | |
| finally: | |
| conn.close() | |
| return redirect(url_for('welcome')) | |
| def profile(): | |
| conn = get_db_connection() | |
| try: | |
| # Запрашиваем все анализы пользователя | |
| reports = conn.execute( | |
| "SELECT * FROM analysis_reports WHERE user_id = ? ORDER BY created_at DESC", | |
| (current_user.id,) | |
| ).fetchall() | |
| # Статистика: общее количество анализов | |
| total_reports = len(reports) | |
| # Статистика: самая частая эмоция | |
| emotion_counts = {} | |
| for r in reports: | |
| emotion_counts[r['emotion']] = emotion_counts.get(r['emotion'], 0) + 1 | |
| most_common_emotion = max(emotion_counts, key=emotion_counts.get) if emotion_counts else None | |
| return render_template( | |
| "profile.html", | |
| reports=reports, | |
| total_reports=total_reports, | |
| most_common_emotion=most_common_emotion, | |
| emotion_map={ | |
| 'joy': '😊 Радость', | |
| 'neutral': '😐 Нейтрально', | |
| 'anger': '😠 Злость', | |
| 'sadness': '😢 Грусть', | |
| 'surprise': '😲 Удивление' | |
| } | |
| ) | |
| except Exception as e: | |
| flash(f"Ошибка загрузки данных: {e}", "danger") | |
| return redirect(url_for('index')) | |
| finally: | |
| conn.close() | |
| def analyze_text(): | |
| if not text_classifier: | |
| return jsonify({"error": "Model not loaded"}), 500 | |
| try: | |
| data = request.get_json() | |
| text = data.get("text", "").strip() | |
| if not text: | |
| return jsonify({"error": "Empty text"}), 400 | |
| # Получаем предсказания модели | |
| result = text_classifier(text) | |
| # Проверяем структуру ответа | |
| if not result or not isinstance(result, list): | |
| return jsonify({"error": "Invalid model response"}), 500 | |
| # Берем первый результат (самый вероятный) | |
| prediction = result[0] if result else {} | |
| # Проверяем наличие нужных полей | |
| if not all(key in prediction for key in ['label', 'score']): | |
| return jsonify({"error": "Invalid prediction format"}), 500 | |
| # Сохраняем в базу данных | |
| conn = get_db_connection() | |
| conn.execute( | |
| "INSERT INTO analysis_reports (user_id, content, emotion, confidence) VALUES (?, ?, ?, ?)", | |
| (current_user.id, text, prediction['label'], prediction['score']) | |
| ) | |
| conn.commit() | |
| conn.close() | |
| return jsonify({ | |
| "emotion": emotion_map.get(prediction['label'], "❓ Неизвестно"), | |
| "confidence": float(prediction['score']) | |
| }) | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| def analyze_audio(): | |
| if not audio_classifier or not speech_to_text_model: | |
| return jsonify({"error": "Model not loaded"}), 500 | |
| if 'audio' not in request.files: | |
| return jsonify({'error': 'No audio file'}), 400 | |
| try: | |
| audio_file = request.files['audio'] | |
| temp_path = "temp_audio.wav" | |
| audio = AudioSegment.from_file(io.BytesIO(audio_file.read())) | |
| audio = audio.set_frame_rate(16000).set_channels(1) | |
| audio.export(temp_path, format="wav", codec="pcm_s16le") | |
| transcribed_text = transcribe_audio(temp_path) | |
| result = audio_classifier(temp_path) | |
| os.remove(temp_path) | |
| emotion_mapping = { | |
| 'hap': 'happy', | |
| 'sad': 'sad', | |
| 'neu': 'neutral', | |
| 'ang': 'angry' | |
| } | |
| emotions = {emotion_mapping.get(item['label'].lower(), 'neutral'): item['score'] | |
| for item in result if item['label'].lower() in emotion_mapping} | |
| dominant_emotion = max(emotions.items(), key=lambda x: x[1]) | |
| response_map = { | |
| 'happy': '😊 Радость', | |
| 'sad': '😢 Грусть', | |
| 'angry': '😠 Злость', | |
| 'neutral': '😐 Нейтрально' | |
| } | |
| conn = get_db_connection() | |
| conn.execute( | |
| "INSERT INTO analysis_reports (user_id, content, emotion, confidence) VALUES (?, ?, ?, ?)", | |
| (current_user.id, transcribed_text, dominant_emotion[0], dominant_emotion[1]) | |
| ) | |
| conn.commit() | |
| conn.close() | |
| return jsonify({ | |
| 'emotion': response_map.get(dominant_emotion[0], 'неизвестно'), | |
| 'confidence': round(dominant_emotion[1], 2), | |
| 'transcribed_text': transcribed_text if transcribed_text else "Не удалось распознать текст" | |
| }) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| def analyze_telegram_chat(): | |
| if 'file' not in request.files: | |
| return jsonify({'error': 'No file uploaded'}), 400 | |
| file = request.files['file'] | |
| if file.filename.split('.')[-1].lower() != 'json': | |
| return jsonify({'error': 'Invalid file format. Only JSON allowed'}), 400 | |
| try: | |
| data = json.load(file) | |
| messages = [] | |
| for msg in data.get('messages', []): | |
| text = msg.get('text') | |
| sender = msg.get('from') or msg.get('sender') or 'Неизвестный пользователь' | |
| if isinstance(text, str) and len(text.strip()) > 5: | |
| messages.append({ | |
| 'text': text, | |
| 'timestamp': msg.get('date', datetime.now().isoformat()), | |
| 'from': sender # <-- сохраняем имя отправителя | |
| }) | |
| if not messages: | |
| return jsonify({'error': 'No valid text messages found'}), 400 | |
| results = [] | |
| for msg in messages[:500]: | |
| prediction = text_classifier(msg['text'])[0] | |
| results.append({ | |
| 'text': msg['text'], | |
| 'emotion': prediction['label'], | |
| 'confidence': prediction['score'], | |
| 'timestamp': msg['timestamp'], | |
| 'from': msg['from'] # <-- передаем дальше | |
| }) | |
| conn = get_db_connection() | |
| conn.execute(''' | |
| CREATE TABLE IF NOT EXISTS telegram_analysis ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| user_id INTEGER, | |
| data TEXT, | |
| created_at TEXT DEFAULT CURRENT_TIMESTAMP, | |
| FOREIGN KEY(user_id) REFERENCES users(id) | |
| ) | |
| ''') | |
| conn.execute( | |
| "INSERT INTO telegram_analysis (user_id, data) VALUES (?, ?)", | |
| (current_user.id, json.dumps(results)) | |
| ) | |
| conn.commit() | |
| return jsonify({ | |
| 'status': 'success', | |
| 'message_count': len(results), | |
| }) | |
| except Exception as e: | |
| print(f"Error during analysis: {e}") | |
| return jsonify({'error': f'Server error: {str(e)}'}), 500 | |
| finally: | |
| conn.close() | |
| def get_chats(): | |
| conn = get_db_connection() | |
| try: | |
| chats = conn.execute( | |
| "SELECT chat_id, title, created_at FROM chats WHERE user_id = ? ORDER BY created_at DESC", | |
| (current_user.id,) | |
| ).fetchall() | |
| return jsonify([dict(chat) for chat in chats]) | |
| finally: | |
| conn.close() | |
| def start_chat(): | |
| try: | |
| chat_id = str(uuid.uuid4()) | |
| title = f"Чат от {datetime.now().strftime('%d.%m.%Y %H:%M')}" | |
| conn = get_db_connection() | |
| conn.execute( | |
| "INSERT INTO chats (chat_id, user_id, created_at, title) VALUES (?, ?, ?, ?)", | |
| (chat_id, current_user.id, datetime.now(), title) | |
| ) | |
| conn.commit() | |
| return jsonify({ | |
| "success": True, | |
| "chat_id": chat_id, | |
| "title": title | |
| }) | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| finally: | |
| conn.close() | |
| def delete_chat(chat_id): | |
| conn = get_db_connection() | |
| try: | |
| # Удаляем связанные сообщения | |
| conn.execute("DELETE FROM messages WHERE chat_id = ?", (chat_id,)) | |
| # Удаляем анализы эмоций, связанные с сообщениями этого чата | |
| # (если у вас есть связь между analysis_reports и chat_id) | |
| conn.execute(""" | |
| DELETE FROM analysis_reports | |
| WHERE content IN ( | |
| SELECT content FROM messages WHERE chat_id = ? | |
| ) AND user_id = ? | |
| """, (chat_id, current_user.id)) | |
| # Удаляем сам чат | |
| conn.execute("DELETE FROM chats WHERE chat_id = ? AND user_id = ?", | |
| (chat_id, current_user.id)) | |
| conn.commit() | |
| return jsonify({"success": True}) | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| finally: | |
| conn.close() | |
| def get_telegram_analysis(): | |
| conn = get_db_connection() | |
| try: | |
| analyses = conn.execute( | |
| "SELECT id, data, created_at FROM telegram_analysis WHERE user_id = ?", | |
| (current_user.id,) | |
| ).fetchall() | |
| return jsonify([dict(analysis) for analysis in analyses]) | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| finally: | |
| conn.close() | |
| def load_chat(chat_id): | |
| conn = get_db_connection() | |
| try: | |
| # Получаем информацию о чате | |
| chat = conn.execute( | |
| "SELECT chat_id, title FROM chats WHERE chat_id = ? AND user_id = ?", | |
| (chat_id, current_user.id) | |
| ).fetchone() | |
| if not chat: | |
| return jsonify({"error": "Чат не найден"}), 404 | |
| # Получаем сообщения чата | |
| messages = conn.execute( | |
| "SELECT sender, content FROM messages WHERE chat_id = ? ORDER BY timestamp ASC", | |
| (chat_id,) | |
| ).fetchall() | |
| return jsonify({ | |
| "chat_id": chat["chat_id"], | |
| "title": chat["title"], | |
| "messages": [dict(msg) for msg in messages] | |
| }) | |
| finally: | |
| conn.close() | |
| def save_message(): | |
| data = request.get_json() | |
| if not data or 'chat_id' not in data or 'content' not in data or 'sender' not in data: | |
| return jsonify({"error": "Неверные данные"}), 400 | |
| conn = get_db_connection() | |
| try: | |
| # Проверяем, что чат принадлежит текущему пользователю | |
| chat = conn.execute( | |
| "SELECT chat_id FROM chats WHERE chat_id = ? AND user_id = ?", | |
| (data['chat_id'], current_user.id) | |
| ).fetchone() | |
| if not chat: | |
| return jsonify({"error": "Чат не найден"}), 404 | |
| # Анализируем эмоцию в тексте | |
| emotion = "neutral" | |
| confidence = 0.0 | |
| if text_classifier and data['content'].strip(): | |
| try: | |
| predictions = text_classifier(data['content'])[0] | |
| top_prediction = max(predictions, key=lambda x: x["score"]) | |
| emotion = top_prediction["label"] | |
| confidence = top_prediction["score"] | |
| # Сохраняем анализ в базу | |
| conn.execute( | |
| "INSERT INTO analysis_reports (user_id, content, emotion, confidence) VALUES (?, ?, ?, ?)", | |
| (current_user.id, data['content'], emotion, confidence) | |
| ) | |
| except Exception as e: | |
| print(f"Ошибка анализа эмоции: {e}") | |
| # Сохраняем сообщение | |
| conn.execute( | |
| "INSERT INTO messages (chat_id, sender, content, timestamp) VALUES (?, ?, ?, ?)", | |
| (data['chat_id'], data['sender'], data['content'], datetime.now()) | |
| ) | |
| conn.commit() | |
| return jsonify({ | |
| "status": "success", | |
| "emotion": emotion_map.get(emotion, "❓ Неизвестно"), | |
| "confidence": round(confidence, 2) | |
| }) | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| finally: | |
| conn.close() | |
| if __name__ == "__main__": | |
| app.run(debug=True) | |