Spaces:
Sleeping
Sleeping
| from datetime import datetime | |
| from collections import defaultdict, Counter | |
| from typing import List, Dict, Any, Optional | |
| import json | |
| import os | |
| # === Optional file persistence === | |
| ANALYTICS_FILE = "analytics_data.json" | |
| # === In-memory analytics store === | |
| analytics_store: Dict[str, List[Dict[str, Any]]] = defaultdict(list) | |
| # === Load existing analytics if present === | |
| def load_analytics(): | |
| if os.path.exists(ANALYTICS_FILE): | |
| with open(ANALYTICS_FILE, "r") as f: | |
| data = json.load(f) | |
| for user, events in data.items(): | |
| analytics_store[user] = events | |
| # === Save current analytics to file === | |
| def save_analytics(): | |
| with open(ANALYTICS_FILE, "w") as f: | |
| json.dump(analytics_store, f, indent=2) | |
| # === Main tracking function === | |
| def track_event(user: str, action: str, ip: Optional[str] = None, user_agent: Optional[str] = None): | |
| event = { | |
| "action": action, | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "ip": ip, | |
| "user_agent": user_agent | |
| } | |
| analytics_store[user].append(event) | |
| save_analytics() | |
| # === Get raw event list for a user === | |
| def get_user_analytics(user: str) -> List[Dict[str, Any]]: | |
| return analytics_store.get(user, []) | |
| # === Summary for one user === | |
| def get_user_summary(user: str) -> Dict[str, Any]: | |
| events = analytics_store.get(user, []) | |
| if not events: | |
| return {"user": user, "total": 0, "last_seen": None, "actions": {}} | |
| action_counts = Counter(event["action"] for event in events) | |
| last_seen = max(event["timestamp"] for event in events) | |
| return { | |
| "user": user, | |
| "total_events": len(events), | |
| "last_seen": last_seen, | |
| "actions": dict(action_counts) | |
| } | |
| # === Summary for all users === | |
| def get_all_users_summary() -> List[Dict[str, Any]]: | |
| return [get_user_summary(user) for user in analytics_store] | |
| # === Top active users === | |
| def get_most_active_users(top_n: int = 10) -> List[Dict[str, Any]]: | |
| summaries = get_all_users_summary() | |
| sorted_users = sorted(summaries, key=lambda x: x["total_events"], reverse=True) | |
| return sorted_users[:top_n] | |
| # === Daily activity trends === | |
| def get_daily_activity(user: Optional[str] = None) -> Dict[str, int]: | |
| target_users = [user] if user else list(analytics_store.keys()) | |
| day_counts = Counter() | |
| for u in target_users: | |
| for event in analytics_store.get(u, []): | |
| day = event["timestamp"].split("T")[0] | |
| day_counts[day] += 1 | |
| return dict(day_counts) | |
| # === Clear individual or all user analytics === | |
| def clear_user_analytics(user: str): | |
| if user in analytics_store: | |
| del analytics_store[user] | |
| save_analytics() | |
| def clear_all_analytics(): | |
| analytics_store.clear() | |
| save_analytics() | |
| # === Load data at module import === | |
| load_analytics() | |