Spaces:
Paused
Paused
| """ | |
| Analytics Database Module for Query Logging and Performance Tracking. | |
| Tracks every query, method, answer, and citation for comprehensive analytics. | |
| """ | |
| import sqlite3 | |
| import json | |
| import time | |
| from datetime import datetime, timedelta | |
| from typing import List, Dict, Any, Optional, Tuple | |
| from pathlib import Path | |
| import logging | |
| from config import DATA_DIR | |
| logger = logging.getLogger(__name__) | |
| # Database path | |
| ANALYTICS_DB = DATA_DIR / "analytics.db" | |
| class AnalyticsDB: | |
| """Database manager for query analytics and logging.""" | |
| def __init__(self): | |
| self.db_path = ANALYTICS_DB | |
| self._init_database() | |
| def _init_database(self): | |
| """Initialize analytics database with required tables.""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| # Main queries table | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS queries ( | |
| query_id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| timestamp TEXT NOT NULL, | |
| user_query TEXT NOT NULL, | |
| retrieval_method TEXT NOT NULL, | |
| answer TEXT NOT NULL, | |
| response_time_ms REAL, | |
| num_citations INTEGER DEFAULT 0, | |
| image_path TEXT, | |
| error_message TEXT, | |
| top_k_used INTEGER DEFAULT 5, | |
| additional_settings TEXT, | |
| answer_length INTEGER, | |
| session_id TEXT | |
| ) | |
| ''') | |
| # Citations table | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS citations ( | |
| citation_id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| query_id INTEGER NOT NULL, | |
| source TEXT NOT NULL, | |
| citation_type TEXT, | |
| relevance_score REAL, | |
| bm25_score REAL, | |
| rerank_score REAL, | |
| similarity_score REAL, | |
| url TEXT, | |
| path TEXT, | |
| rank INTEGER, | |
| FOREIGN KEY (query_id) REFERENCES queries (query_id) | |
| ) | |
| ''') | |
| # Performance metrics table | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS performance_metrics ( | |
| metric_id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| query_id INTEGER NOT NULL, | |
| retrieval_time_ms REAL, | |
| generation_time_ms REAL, | |
| total_time_ms REAL, | |
| chunks_retrieved INTEGER, | |
| tokens_estimated INTEGER, | |
| FOREIGN KEY (query_id) REFERENCES queries (query_id) | |
| ) | |
| ''') | |
| conn.commit() | |
| conn.close() | |
| logger.info("Analytics database initialized") | |
| def log_query(self, user_query: str, method: str, answer: str, | |
| citations: List[Dict], response_time: float = None, | |
| image_path: str = None, error_message: str = None, | |
| top_k: int = 5, additional_settings: Dict = None, | |
| session_id: str = None) -> int: | |
| """ | |
| Log a complete query interaction. | |
| Args: | |
| user_query: The user's question | |
| method: Retrieval method used | |
| answer: Generated answer | |
| citations: List of citation dictionaries | |
| response_time: Time taken in milliseconds | |
| image_path: Path to uploaded image (if any) | |
| error_message: Error message (if any) | |
| top_k: Number of chunks retrieved | |
| additional_settings: Method-specific settings | |
| session_id: Session identifier | |
| Returns: | |
| query_id: The ID of the logged query | |
| """ | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| try: | |
| # Insert main query record | |
| cursor.execute(''' | |
| INSERT INTO queries ( | |
| timestamp, user_query, retrieval_method, answer, | |
| response_time_ms, num_citations, image_path, error_message, | |
| top_k_used, additional_settings, answer_length, session_id | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| ''', ( | |
| datetime.now().isoformat(), | |
| user_query, | |
| method, | |
| answer, | |
| response_time, | |
| len(citations), | |
| image_path, | |
| error_message, | |
| top_k, | |
| json.dumps(additional_settings) if additional_settings else None, | |
| len(answer), | |
| session_id | |
| )) | |
| query_id = cursor.lastrowid | |
| # Insert citations | |
| for rank, citation in enumerate(citations, 1): | |
| cursor.execute(''' | |
| INSERT INTO citations ( | |
| query_id, source, citation_type, relevance_score, | |
| bm25_score, rerank_score, similarity_score, url, path, rank | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| ''', ( | |
| query_id, | |
| citation.get('source', ''), | |
| citation.get('type', ''), | |
| citation.get('relevance_score'), | |
| citation.get('bm25_score'), | |
| citation.get('rerank_score'), | |
| citation.get('similarity_score'), | |
| citation.get('url'), | |
| citation.get('path'), | |
| rank | |
| )) | |
| conn.commit() | |
| logger.info(f"Logged query {query_id} with {len(citations)} citations") | |
| return query_id | |
| except Exception as e: | |
| logger.error(f"Error logging query: {e}") | |
| conn.rollback() | |
| return None | |
| finally: | |
| conn.close() | |
| def get_query_stats(self, days: int = 30) -> Dict[str, Any]: | |
| """Get comprehensive query statistics.""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| since_date = (datetime.now() - timedelta(days=days)).isoformat() | |
| try: | |
| stats = {} | |
| # Total queries | |
| cursor.execute(''' | |
| SELECT COUNT(*) FROM queries | |
| WHERE timestamp >= ? | |
| ''', (since_date,)) | |
| stats['total_queries'] = cursor.fetchone()[0] | |
| # Method usage | |
| cursor.execute(''' | |
| SELECT retrieval_method, COUNT(*) as count | |
| FROM queries | |
| WHERE timestamp >= ? | |
| GROUP BY retrieval_method | |
| ORDER BY count DESC | |
| ''', (since_date,)) | |
| stats['method_usage'] = dict(cursor.fetchall()) | |
| # Average response times by method | |
| cursor.execute(''' | |
| SELECT retrieval_method, AVG(response_time_ms) as avg_time | |
| FROM queries | |
| WHERE timestamp >= ? AND response_time_ms IS NOT NULL | |
| GROUP BY retrieval_method | |
| ''', (since_date,)) | |
| stats['avg_response_times'] = dict(cursor.fetchall()) | |
| # Citation statistics | |
| cursor.execute(''' | |
| SELECT AVG(num_citations) as avg_citations, | |
| SUM(num_citations) as total_citations | |
| FROM queries | |
| WHERE timestamp >= ? | |
| ''', (since_date,)) | |
| result = cursor.fetchone() | |
| stats['avg_citations'] = result[0] or 0 | |
| stats['total_citations'] = result[1] or 0 | |
| # Citation types | |
| cursor.execute(''' | |
| SELECT c.citation_type, COUNT(*) as count | |
| FROM citations c | |
| JOIN queries q ON c.query_id = q.query_id | |
| WHERE q.timestamp >= ? | |
| GROUP BY c.citation_type | |
| ORDER BY count DESC | |
| ''', (since_date,)) | |
| stats['citation_types'] = dict(cursor.fetchall()) | |
| # Error rate | |
| cursor.execute(''' | |
| SELECT | |
| COUNT(CASE WHEN error_message IS NOT NULL THEN 1 END) as errors, | |
| COUNT(*) as total | |
| FROM queries | |
| WHERE timestamp >= ? | |
| ''', (since_date,)) | |
| result = cursor.fetchone() | |
| stats['error_rate'] = (result[0] / result[1]) * 100 if result[1] > 0 else 0 | |
| # Most common query topics (simple word analysis) | |
| cursor.execute(''' | |
| SELECT user_query FROM queries | |
| WHERE timestamp >= ? | |
| ''', (since_date,)) | |
| queries = [row[0].lower() for row in cursor.fetchall()] | |
| # Simple keyword extraction | |
| keywords = {} | |
| for query in queries: | |
| words = [word for word in query.split() if len(word) > 3] | |
| for word in words: | |
| keywords[word] = keywords.get(word, 0) + 1 | |
| # Top 10 keywords | |
| stats['top_keywords'] = dict(sorted(keywords.items(), | |
| key=lambda x: x[1], | |
| reverse=True)[:10]) | |
| return stats | |
| except Exception as e: | |
| logger.error(f"Error getting query stats: {e}") | |
| return {} | |
| finally: | |
| conn.close() | |
| def get_method_performance(self) -> Dict[str, Dict[str, float]]: | |
| """Get detailed performance metrics by method.""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| try: | |
| cursor.execute(''' | |
| SELECT | |
| retrieval_method, | |
| AVG(response_time_ms) as avg_response_time, | |
| AVG(num_citations) as avg_citations, | |
| AVG(answer_length) as avg_answer_length, | |
| COUNT(*) as query_count | |
| FROM queries | |
| WHERE response_time_ms IS NOT NULL | |
| GROUP BY retrieval_method | |
| ''') | |
| results = {} | |
| for row in cursor.fetchall(): | |
| method, avg_time, avg_cites, avg_length, count = row | |
| results[method] = { | |
| 'avg_response_time': avg_time, | |
| 'avg_citations': avg_cites, | |
| 'avg_answer_length': avg_length, | |
| 'query_count': count | |
| } | |
| return results | |
| except Exception as e: | |
| logger.error(f"Error getting method performance: {e}") | |
| return {} | |
| finally: | |
| conn.close() | |
| def get_recent_queries(self, limit: int = 20, include_answers: bool = True) -> List[Dict[str, Any]]: | |
| """Get recent queries with basic information and optionally full answers.""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| try: | |
| if include_answers: | |
| cursor.execute(''' | |
| SELECT query_id, timestamp, user_query, retrieval_method, | |
| answer, answer_length, num_citations, response_time_ms, error_message | |
| FROM queries | |
| ORDER BY timestamp DESC | |
| LIMIT ? | |
| ''', (limit,)) | |
| columns = ['query_id', 'timestamp', 'query', 'method', | |
| 'answer', 'answer_length', 'citations', 'response_time', 'error_message'] | |
| else: | |
| cursor.execute(''' | |
| SELECT query_id, timestamp, user_query, retrieval_method, | |
| answer_length, num_citations, response_time_ms | |
| FROM queries | |
| ORDER BY timestamp DESC | |
| LIMIT ? | |
| ''', (limit,)) | |
| columns = ['query_id', 'timestamp', 'query', 'method', | |
| 'answer_length', 'citations', 'response_time'] | |
| return [dict(zip(columns, row)) for row in cursor.fetchall()] | |
| except Exception as e: | |
| logger.error(f"Error getting recent queries: {e}") | |
| return [] | |
| finally: | |
| conn.close() | |
| def get_query_with_citations(self, query_id: int) -> Dict[str, Any]: | |
| """Get full query details including citations.""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| try: | |
| # Get query details | |
| cursor.execute(''' | |
| SELECT query_id, timestamp, user_query, retrieval_method, answer, | |
| response_time_ms, num_citations, error_message, top_k_used | |
| FROM queries WHERE query_id = ? | |
| ''', (query_id,)) | |
| query_row = cursor.fetchone() | |
| if not query_row: | |
| return {} | |
| query_data = { | |
| 'query_id': query_row[0], | |
| 'timestamp': query_row[1], | |
| 'user_query': query_row[2], | |
| 'method': query_row[3], | |
| 'answer': query_row[4], | |
| 'response_time': query_row[5], | |
| 'num_citations': query_row[6], | |
| 'error_message': query_row[7], | |
| 'top_k_used': query_row[8] | |
| } | |
| # Get citations | |
| cursor.execute(''' | |
| SELECT source, citation_type, relevance_score, bm25_score, | |
| rerank_score, similarity_score, url, path, rank | |
| FROM citations WHERE query_id = ? | |
| ORDER BY rank | |
| ''', (query_id,)) | |
| citations = [] | |
| for row in cursor.fetchall(): | |
| citation = { | |
| 'source': row[0], | |
| 'type': row[1], | |
| 'relevance_score': row[2], | |
| 'bm25_score': row[3], | |
| 'rerank_score': row[4], | |
| 'similarity_score': row[5], | |
| 'url': row[6], | |
| 'path': row[7], | |
| 'rank': row[8] | |
| } | |
| citations.append(citation) | |
| query_data['citations'] = citations | |
| return query_data | |
| except Exception as e: | |
| logger.error(f"Error getting query with citations: {e}") | |
| return {} | |
| finally: | |
| conn.close() | |
| def get_query_trends(self, days: int = 30) -> Dict[str, List[Tuple[str, int]]]: | |
| """Get query trends over time.""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| since_date = (datetime.now() - timedelta(days=days)).isoformat() | |
| try: | |
| # Queries per day | |
| cursor.execute(''' | |
| SELECT DATE(timestamp) as date, COUNT(*) as count | |
| FROM queries | |
| WHERE timestamp >= ? | |
| GROUP BY DATE(timestamp) | |
| ORDER BY date | |
| ''', (since_date,)) | |
| daily_queries = cursor.fetchall() | |
| # Method usage trends | |
| cursor.execute(''' | |
| SELECT DATE(timestamp) as date, retrieval_method, COUNT(*) as count | |
| FROM queries | |
| WHERE timestamp >= ? | |
| GROUP BY DATE(timestamp), retrieval_method | |
| ORDER BY date, retrieval_method | |
| ''', (since_date,)) | |
| method_trends = {} | |
| for date, method, count in cursor.fetchall(): | |
| if method not in method_trends: | |
| method_trends[method] = [] | |
| method_trends[method].append((date, count)) | |
| return { | |
| 'daily_queries': daily_queries, | |
| 'method_trends': method_trends | |
| } | |
| except Exception as e: | |
| logger.error(f"Error getting query trends: {e}") | |
| return {} | |
| finally: | |
| conn.close() | |
| def get_voice_interaction_stats(self) -> Dict[str, Any]: | |
| """Get statistics about voice interactions.""" | |
| try: | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| # Count voice interactions (those with voice_interaction=true in additional_settings) | |
| cursor.execute(''' | |
| SELECT COUNT(*) as total_voice_queries | |
| FROM queries | |
| WHERE additional_settings LIKE '%voice_interaction%' | |
| OR session_id LIKE 'voice_%' | |
| ''') | |
| result = cursor.fetchone() | |
| total_voice = result[0] if result else 0 | |
| # Get voice queries by method | |
| cursor.execute(''' | |
| SELECT retrieval_method, COUNT(*) as count | |
| FROM queries | |
| WHERE additional_settings LIKE '%voice_interaction%' | |
| OR session_id LIKE 'voice_%' | |
| GROUP BY retrieval_method | |
| ''') | |
| voice_by_method = dict(cursor.fetchall()) | |
| # Average response time for voice queries | |
| cursor.execute(''' | |
| SELECT AVG(response_time_ms) as avg_response_time | |
| FROM queries | |
| WHERE (additional_settings LIKE '%voice_interaction%' | |
| OR session_id LIKE 'voice_%') | |
| AND response_time_ms IS NOT NULL | |
| ''') | |
| result = cursor.fetchone() | |
| avg_response_time = result[0] if result and result[0] else 0 | |
| return { | |
| 'total_voice_queries': total_voice, | |
| 'voice_by_method': voice_by_method, | |
| 'avg_voice_response_time': avg_response_time | |
| } | |
| except Exception as e: | |
| logger.error(f"Error getting voice interaction stats: {e}") | |
| return {} | |
| finally: | |
| conn.close() | |
| # Global instance | |
| analytics_db = AnalyticsDB() | |
| # Convenience functions | |
| def log_query(user_query: str, method: str, answer: str, citations: List[Dict], | |
| **kwargs) -> int: | |
| """Log a query to the analytics database.""" | |
| return analytics_db.log_query(user_query, method, answer, citations, **kwargs) | |
| def get_analytics_stats(days: int = 30) -> Dict[str, Any]: | |
| """Get analytics statistics.""" | |
| return analytics_db.get_query_stats(days) | |
| def get_method_performance() -> Dict[str, Dict[str, float]]: | |
| """Get method performance metrics.""" | |
| return analytics_db.get_method_performance() |