Spaces:
Running
Running
| import gradio as gr | |
| from gradio_client import Client, handle_file | |
| import json | |
| import os | |
| import re | |
| from datetime import datetime | |
| from typing import List, Optional | |
| from huggingface_hub import HfApi, hf_hub_download, list_repo_files | |
| from pathlib import Path | |
| import tempfile | |
| import base64 | |
| from rembg import remove | |
| from auth import verify_hf_token | |
| # HuggingFace configuration | |
| HF_TOKEN = os.getenv("HF_TOKEN") # Required for writing to dataset | |
| DATASET_REPO = "Fraser/piclets" # Public dataset repository | |
| DATASET_TYPE = "dataset" | |
| # Initialize HuggingFace API with token if available | |
| api = HfApi(token=HF_TOKEN) if HF_TOKEN else HfApi() | |
| # Cache directory for local operations | |
| CACHE_DIR = Path("cache") | |
| CACHE_DIR.mkdir(exist_ok=True) | |
| class PicletDiscoveryService: | |
| """Manages Piclet discovery using HuggingFace datasets""" | |
| def normalize_object_name(name: str) -> str: | |
| """ | |
| Normalize object names for consistent storage and lookup | |
| Examples: "The Blue Pillow" -> "pillow", "wooden chairs" -> "wooden_chair" | |
| """ | |
| if not name: | |
| return "unknown" | |
| # Convert to lowercase and strip | |
| name = name.lower().strip() | |
| # Remove articles (the, a, an) | |
| name = re.sub(r'^(the|a|an)\s+', '', name) | |
| # Remove special characters except spaces | |
| name = re.sub(r'[^a-z0-9\s]', '', name) | |
| # Handle common plurals (basic pluralization rules) | |
| if name.endswith('ies') and len(name) > 4: | |
| name = name[:-3] + 'y' # berries -> berry | |
| elif name.endswith('ves') and len(name) > 4: | |
| name = name[:-3] + 'f' # leaves -> leaf | |
| elif name.endswith('es') and len(name) > 3: | |
| # Check if it's a special case like "glasses" | |
| if not name.endswith(('ses', 'xes', 'zes', 'ches', 'shes')): | |
| name = name[:-2] # boxes -> box (but keep glasses) | |
| elif name.endswith('s') and len(name) > 2 and not name.endswith('ss'): | |
| name = name[:-1] # chairs -> chair (but keep glass) | |
| # Replace spaces with underscores | |
| name = re.sub(r'\s+', '_', name.strip()) | |
| return name | |
| def load_piclet_data(object_name: str) -> Optional[dict]: | |
| """Load Piclet data from HuggingFace dataset""" | |
| try: | |
| normalized_name = PicletDiscoveryService.normalize_object_name(object_name) | |
| file_path = f"piclets/{normalized_name}.json" | |
| # Download the file from HuggingFace | |
| local_path = hf_hub_download( | |
| repo_id=DATASET_REPO, | |
| filename=file_path, | |
| repo_type=DATASET_TYPE, | |
| token=HF_TOKEN, | |
| cache_dir=str(CACHE_DIR) | |
| ) | |
| with open(local_path, 'r') as f: | |
| return json.load(f) | |
| except Exception as e: | |
| print(f"Could not load piclet data for {object_name}: {e}") | |
| return None | |
| def save_piclet_data(object_name: str, data: dict) -> bool: | |
| """Save Piclet data to HuggingFace dataset""" | |
| try: | |
| normalized_name = PicletDiscoveryService.normalize_object_name(object_name) | |
| file_path = f"piclets/{normalized_name}.json" | |
| # Create a temporary file | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: | |
| json.dump(data, f, indent=2) | |
| temp_path = f.name | |
| # Upload to HuggingFace | |
| api.upload_file( | |
| path_or_fileobj=temp_path, | |
| path_in_repo=file_path, | |
| repo_id=DATASET_REPO, | |
| repo_type=DATASET_TYPE, | |
| commit_message=f"Update piclet: {normalized_name}" | |
| ) | |
| # Clean up | |
| os.unlink(temp_path) | |
| return True | |
| except Exception as e: | |
| print(f"Failed to save piclet data: {e}") | |
| return False | |
| def load_user_data(sub: str) -> dict: | |
| """ | |
| Load user profile from dataset by HF user ID (sub) | |
| Args: | |
| sub: HuggingFace user ID (stable identifier) | |
| Returns: | |
| User profile dict or default profile if not found | |
| """ | |
| try: | |
| file_path = f"users/{sub}.json" | |
| local_path = hf_hub_download( | |
| repo_id=DATASET_REPO, | |
| filename=file_path, | |
| repo_type=DATASET_TYPE, | |
| token=HF_TOKEN, | |
| cache_dir=str(CACHE_DIR) | |
| ) | |
| with open(local_path, 'r') as f: | |
| return json.load(f) | |
| except: | |
| # Return default user profile if not found | |
| # Will be populated with actual data on first save | |
| return { | |
| "sub": sub, | |
| "preferred_username": None, | |
| "name": None, | |
| "picture": None, | |
| "joinedAt": datetime.now().isoformat(), | |
| "lastSeen": datetime.now().isoformat(), | |
| "discoveries": [], | |
| "uniqueFinds": 0, | |
| "totalFinds": 0, | |
| "rarityScore": 0, | |
| "visibility": "public" | |
| } | |
| def save_user_data(sub: str, data: dict) -> bool: | |
| """ | |
| Save user profile to dataset by HF user ID (sub) | |
| Args: | |
| sub: HuggingFace user ID (stable identifier) | |
| data: User profile dict | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| try: | |
| file_path = f"users/{sub}.json" | |
| # Update lastSeen timestamp | |
| data["lastSeen"] = datetime.now().isoformat() | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: | |
| json.dump(data, f, indent=2) | |
| temp_path = f.name | |
| api.upload_file( | |
| path_or_fileobj=temp_path, | |
| path_in_repo=file_path, | |
| repo_id=DATASET_REPO, | |
| repo_type=DATASET_TYPE, | |
| commit_message=f"Update user profile: {data.get('preferred_username', sub)}" | |
| ) | |
| os.unlink(temp_path) | |
| return True | |
| except Exception as e: | |
| print(f"Failed to save user data: {e}") | |
| return False | |
| def get_or_create_user_profile(user_info: dict) -> dict: | |
| """ | |
| Get existing user profile or create new one from OAuth user_info | |
| Refreshes cached profile data on each call | |
| Args: | |
| user_info: OAuth user info from HF (sub, preferred_username, name, picture) | |
| Returns: | |
| User profile dict | |
| """ | |
| sub = user_info['sub'] | |
| # Load existing profile | |
| profile = PicletDiscoveryService.load_user_data(sub) | |
| # Update cached profile fields from OAuth | |
| profile['sub'] = sub | |
| profile['preferred_username'] = user_info.get('preferred_username') | |
| profile['name'] = user_info.get('name') | |
| profile['picture'] = user_info.get('picture') | |
| profile['email'] = user_info.get('email') | |
| # Set joinedAt only if this is a new profile | |
| if 'joinedAt' not in profile or not profile['joinedAt']: | |
| profile['joinedAt'] = datetime.now().isoformat() | |
| return profile | |
| def update_global_stats() -> dict: | |
| """Update and return global statistics""" | |
| try: | |
| # Try to load existing stats | |
| try: | |
| local_path = hf_hub_download( | |
| repo_id=DATASET_REPO, | |
| filename="metadata/stats.json", | |
| repo_type=DATASET_TYPE, | |
| token=HF_TOKEN, | |
| cache_dir=str(CACHE_DIR) | |
| ) | |
| with open(local_path, 'r') as f: | |
| stats = json.load(f) | |
| except: | |
| stats = { | |
| "totalDiscoveries": 0, | |
| "uniqueObjects": 0, | |
| "totalVariations": 0, | |
| "lastUpdated": datetime.now().isoformat() | |
| } | |
| return stats | |
| except Exception as e: | |
| print(f"Failed to update global stats: {e}") | |
| return {} | |
| class PicletGeneratorService: | |
| """ | |
| Orchestrates Piclet generation by calling external AI services | |
| Uses user's hf_token to consume their GPU quota | |
| """ | |
| # Space endpoints | |
| JOY_CAPTION_SPACE = "fancyfeast/joy-caption-alpha-two" | |
| GPT_OSS_SPACE = "amd/gpt-oss-120b-chatbot" | |
| FLUX_SPACE = "black-forest-labs/FLUX.1-schnell" | |
| def generate_enhanced_caption(image_path: str, hf_token: str) -> str: | |
| """Generate detailed image description using JoyCaption | |
| Args: | |
| image_path: Path to image file | |
| hf_token: User's HuggingFace token | |
| """ | |
| try: | |
| print(f"Connecting to JoyCaption space with user token...") | |
| client = Client( | |
| PicletGeneratorService.JOY_CAPTION_SPACE, | |
| hf_token=hf_token | |
| ) | |
| print(f"Generating caption for image...") | |
| result = client.predict( | |
| handle_file(image_path), # Wrap path so client uploads file | |
| "Descriptive", # caption_type | |
| "medium-length", # caption_length | |
| [], # extra_options | |
| "", # name_input | |
| "Describe this image in detail, identifying any recognizable objects, brands, logos, or specific models. Be specific about product names and types.", # custom_prompt | |
| api_name="/stream_chat" | |
| ) | |
| # JoyCaption returns tuple: (prompt_used, caption_text) in .data | |
| result_data = result.data if hasattr(result, 'data') else result | |
| caption = result_data[1] if isinstance(result_data, (list, tuple)) and len(result_data) > 1 else str(result_data) | |
| print(f"Caption generated: {caption[:100]}...") | |
| return caption | |
| except Exception as e: | |
| print(f"Failed to generate caption: {e}") | |
| raise Exception(f"Caption generation failed: {str(e)}") | |
| def generate_text_with_gpt(prompt: str, hf_token: str) -> str: | |
| """Generate text using GPT-OSS-120B""" | |
| try: | |
| print(f"Connecting to GPT-OSS space...") | |
| client = Client( | |
| PicletGeneratorService.GPT_OSS_SPACE, | |
| hf_token=hf_token | |
| ) | |
| print(f"Generating text...") | |
| result = client.predict( | |
| api_name="/chat", | |
| message=prompt, | |
| history=[], | |
| system_prompt="You are a helpful assistant that creates Pokemon-style monster concepts based on real-world objects.", | |
| temperature=0.7 | |
| ) | |
| # Extract response text (GPT-OSS formats with Analysis and Response) | |
| result_data = result.data if hasattr(result, 'data') else result | |
| response_text = result_data[0] if isinstance(result_data, (list, tuple)) else str(result_data) | |
| # Try to extract Response section | |
| response_match = re.search(r'\*\*๐ฌ Response:\*\*\s*\n\n([\s\S]*)', response_text) | |
| if response_match: | |
| return response_match.group(1).strip() | |
| # Fallback: extract after "assistantfinal" | |
| final_match = re.search(r'assistantfinal\s*([\s\S]*)', response_text) | |
| if final_match: | |
| return final_match.group(1).strip() | |
| return response_text | |
| except Exception as e: | |
| print(f"Failed to generate text: {e}") | |
| raise Exception(f"Text generation failed: {str(e)}") | |
| def generate_piclet_concept(caption: str, hf_token: str) -> dict: | |
| """ | |
| Generate complete Piclet concept from image caption | |
| Returns parsed concept with object name, variation, stats, etc. | |
| """ | |
| concept_prompt = f"""You are analyzing an image to create a Pokemon-style creature. Here's the image description: | |
| "{caption}" | |
| Your task: | |
| 1. Identify the PRIMARY PHYSICAL OBJECT with SPECIFICITY (e.g., "macbook" not "laptop", "eiffel tower" not "tower", "iphone" not "phone", "starbucks mug" not "mug") | |
| 2. Determine if there's a meaningful VARIATION (e.g., "silver", "pro", "night", "gaming", "vintage") | |
| 3. Assess rarity based on uniqueness | |
| 4. Create a complete Pokemon-style monster concept | |
| Format your response EXACTLY as follows: | |
| ```md | |
| # Canonical Object | |
| {{Specific object name: "macbook", "eiffel tower", "iphone", "tesla", "le creuset mug", "nintendo switch"}} | |
| {{NOT generic terms like: "laptop", "tower", "phone", "car", "mug", "console"}} | |
| {{Include brand/model/landmark name when identifiable}} | |
| # Variation | |
| {{OPTIONAL: one distinctive attribute like "silver", "pro", "night", "gaming", OR use "canonical" if this is the standard/default version with no special variation}} | |
| # Object Rarity | |
| {{common, uncommon, rare, epic, or legendary based on object uniqueness}} | |
| # Monster Name | |
| {{Creative 8-11 letter name based on the SPECIFIC object, e.g., "Macbyte" for MacBook, "Towerfell" for Eiffel Tower}} | |
| # Primary Type | |
| {{beast, bug, aquatic, flora, mineral, space, machina, structure, culture, or cuisine}} | |
| # Physical Stats | |
| Height: {{e.g., "1.2m" or "3'5\\""}} | |
| Weight: {{e.g., "15kg" or "33 lbs"}} | |
| # Personality | |
| {{1-2 sentences describing personality traits}} | |
| # Monster Description | |
| {{2-3 paragraphs describing how the SPECIFIC object's features translate into monster features. Reference the actual object by name. This is the creature's bio.}} | |
| # Monster Image Prompt | |
| {{Concise visual description for anime-style image generation focusing on colors, shapes, and key features inspired by the specific object}} | |
| ``` | |
| CRITICAL RULES: | |
| - Canonical Object MUST be SPECIFIC: "macbook" not "laptop", "big ben" not "clock tower", "coca cola" not "soda" | |
| - If you can identify a brand, model, or proper name from the description, USE IT | |
| - Variation should be meaningful and distinctive (material, style, color, context, or model variant) | |
| - Monster Description must describe the CREATURE with references to the specific object's features | |
| - Primary Type must match the object category (machina for electronics, structure for buildings, etc.)""" | |
| response_text = PicletGeneratorService.generate_text_with_gpt(concept_prompt, hf_token) | |
| # Parse the concept | |
| return PicletGeneratorService.parse_concept(response_text) | |
| def parse_concept(concept_text: str) -> dict: | |
| """Parse structured concept text into dict""" | |
| # Remove code block markers if present | |
| if '```' in concept_text: | |
| code_block_match = re.search(r'```(?:md|markdown)?\s*\n([\s\S]*?)```', concept_text) | |
| if code_block_match: | |
| concept_text = code_block_match.group(1).strip() | |
| def extract_section(text: str, section: str) -> str: | |
| """Extract content of a markdown section""" | |
| pattern = rf'\*{{0,2}}#\s*{re.escape(section)}\s*\*{{0,2}}\s*\n([\s\S]*?)(?=^\*{{0,2}}#|$)' | |
| match = re.search(pattern, text, re.MULTILINE) | |
| if match: | |
| content = match.group(1).strip() | |
| # Remove curly braces and quotes that GPT sometimes adds | |
| content = re.sub(r'^[{"]|["}]$', '', content) | |
| content = re.sub(r'^.*:\s*["\']|["\']$', '', content) | |
| return content.strip() | |
| return '' | |
| # Extract all sections | |
| object_name = extract_section(concept_text, 'Canonical Object').lower() | |
| variation_text = extract_section(concept_text, 'Variation') | |
| rarity_text = extract_section(concept_text, 'Object Rarity').lower() | |
| monster_name = extract_section(concept_text, 'Monster Name') | |
| primary_type = extract_section(concept_text, 'Primary Type').lower() | |
| description = extract_section(concept_text, 'Monster Description') | |
| image_prompt = extract_section(concept_text, 'Monster Image Prompt') | |
| # Parse physical stats | |
| physical_stats_text = extract_section(concept_text, 'Physical Stats') | |
| height_match = re.search(r'Height:\s*(.+)', physical_stats_text, re.IGNORECASE) | |
| weight_match = re.search(r'Weight:\s*(.+)', physical_stats_text, re.IGNORECASE) | |
| height = height_match.group(1).strip() if height_match else None | |
| weight = weight_match.group(1).strip() if weight_match else None | |
| personality = extract_section(concept_text, 'Personality') | |
| # Clean monster name | |
| if monster_name: | |
| monster_name = re.sub(r'\*+', '', monster_name) # Remove asterisks | |
| if ',' in monster_name: | |
| monster_name = monster_name.split(',')[0] | |
| if len(monster_name) > 12: | |
| monster_name = monster_name[:12] | |
| # Parse variation | |
| attributes = [] | |
| if variation_text and variation_text.lower() not in ['none', 'canonical', '']: | |
| attributes = [variation_text.lower()] | |
| # Map rarity to tier | |
| tier = 'medium' | |
| if 'common' in rarity_text: | |
| tier = 'low' | |
| elif 'uncommon' in rarity_text: | |
| tier = 'medium' | |
| elif 'rare' in rarity_text and 'epic' not in rarity_text: | |
| tier = 'high' | |
| elif 'legendary' in rarity_text or 'epic' in rarity_text or 'mythical' in rarity_text: | |
| tier = 'legendary' | |
| return { | |
| 'objectName': object_name, | |
| 'attributes': attributes, | |
| 'concept': concept_text, | |
| 'stats': { | |
| 'name': monster_name or 'Unknown', | |
| 'description': description, | |
| 'tier': tier, | |
| 'primaryType': primary_type or 'beast', | |
| 'height': height, | |
| 'weight': weight, | |
| 'personality': personality | |
| }, | |
| 'imagePrompt': image_prompt | |
| } | |
| def generate_piclet_image(image_prompt: str, tier: str, hf_token: str) -> dict: | |
| """Generate Piclet image using Flux""" | |
| try: | |
| print(f"Connecting to Flux space...") | |
| client = Client( | |
| PicletGeneratorService.FLUX_SPACE, | |
| hf_token=hf_token | |
| ) | |
| tier_descriptions = { | |
| 'low': 'simple and iconic design', | |
| 'medium': 'detailed and well-crafted design', | |
| 'high': 'highly detailed and impressive design with special effects', | |
| 'legendary': 'highly detailed and majestic design with dramatic lighting and aura effects' | |
| } | |
| full_prompt = f"{image_prompt}\nNow generate an Pokรฉmon Anime image of the monster in an idle pose with a plain dark-grey background. This is a {tier} tier monster with a {tier_descriptions.get(tier, tier_descriptions['medium'])}. The monster should not be attacking or in motion. The full monster must be visible within the frame." | |
| print(f"Generating image with prompt: {full_prompt[:100]}...") | |
| result = client.predict( | |
| full_prompt, # prompt | |
| 0, # seed | |
| True, # randomize_seed | |
| 1024, # width | |
| 1024, # height | |
| 4, # num_inference_steps | |
| api_name="/infer" | |
| ) | |
| # Extract image URL and seed | |
| result_data = result.data if hasattr(result, 'data') else result | |
| image_data = result_data[0] if isinstance(result_data, (list, tuple)) else result_data | |
| seed = result_data[1] if isinstance(result_data, (list, tuple)) and len(result_data) > 1 else 0 | |
| # Handle different return formats | |
| image_url = None | |
| if isinstance(image_data, str): | |
| image_url = image_data | |
| elif isinstance(image_data, dict): | |
| image_url = image_data.get('url') or image_data.get('path') | |
| if not image_url: | |
| raise Exception("Failed to extract image URL from Flux response") | |
| return { | |
| 'imageUrl': image_url, | |
| 'seed': seed, | |
| 'prompt': image_prompt | |
| } | |
| except Exception as e: | |
| print(f"Failed to generate image: {e}") | |
| raise Exception(f"Image generation failed: {str(e)}") | |
| def remove_background(image_path: str) -> str: | |
| """ | |
| Remove background from image using rembg | |
| Returns base64 encoded PNG with transparency | |
| Args: | |
| image_path: Path to the input image file | |
| Returns: | |
| Base64 encoded string of the image with transparent background | |
| """ | |
| try: | |
| print(f"Removing background from image...") | |
| # Open the image | |
| with open(image_path, 'rb') as f: | |
| input_image = f.read() | |
| # Remove background using rembg | |
| output_image = remove(input_image) | |
| # Convert to base64 | |
| base64_image = base64.b64encode(output_image).decode('utf-8') | |
| print(f"Background removal completed") | |
| return f"data:image/png;base64,{base64_image}" | |
| except Exception as e: | |
| print(f"Failed to remove background: {e}") | |
| raise Exception(f"Background removal failed: {str(e)}") | |
| def upload_image_to_dataset(image_data: str, file_name: str) -> str: | |
| """ | |
| Upload image to HuggingFace dataset | |
| Args: | |
| image_data: Base64 encoded image data (with or without data URI prefix) | |
| file_name: Name for the file (e.g., "pillow_canonical.png") | |
| Returns: | |
| URL to the uploaded image in the dataset | |
| """ | |
| try: | |
| print(f"Uploading image to dataset: {file_name}") | |
| # Remove data URI prefix if present | |
| if image_data.startswith('data:'): | |
| image_data = image_data.split(',', 1)[1] | |
| # Decode base64 to bytes | |
| image_bytes = base64.b64decode(image_data) | |
| # Create temporary file | |
| with tempfile.NamedTemporaryFile(mode='wb', suffix='.png', delete=False) as f: | |
| f.write(image_bytes) | |
| temp_path = f.name | |
| # Upload to HuggingFace dataset | |
| file_path = f"images/{file_name}" | |
| api.upload_file( | |
| path_or_fileobj=temp_path, | |
| path_in_repo=file_path, | |
| repo_id=DATASET_REPO, | |
| repo_type=DATASET_TYPE, | |
| commit_message=f"Add piclet image: {file_name}" | |
| ) | |
| # Clean up temp file | |
| os.unlink(temp_path) | |
| # Return the dataset URL | |
| dataset_url = f"https://huggingface.co/datasets/{DATASET_REPO}/resolve/main/{file_path}" | |
| print(f"Image uploaded successfully: {dataset_url}") | |
| return dataset_url | |
| except Exception as e: | |
| print(f"Failed to upload image: {e}") | |
| raise Exception(f"Image upload failed: {str(e)}") | |
| # API Endpoints | |
| def search_piclet(object_name: str, attributes: List[str]) -> dict: | |
| """ | |
| Search for canonical Piclet or variations | |
| Returns matching piclet or None | |
| """ | |
| piclet_data = PicletDiscoveryService.load_piclet_data(object_name) | |
| if not piclet_data: | |
| return { | |
| "status": "new", | |
| "message": f"No Piclet found for '{object_name}'", | |
| "piclet": None | |
| } | |
| # Check if searching for canonical (no attributes) | |
| if not attributes or len(attributes) == 0: | |
| return { | |
| "status": "existing", | |
| "message": f"Found canonical Piclet for '{object_name}'", | |
| "piclet": piclet_data.get("canonical") | |
| } | |
| # Search for matching variation | |
| variations = piclet_data.get("variations", []) | |
| for variation in variations: | |
| var_attrs = set(variation.get("attributes", [])) | |
| search_attrs = set(attributes) | |
| # Check for close match (at least 50% overlap) | |
| overlap = len(var_attrs.intersection(search_attrs)) | |
| if overlap >= len(search_attrs) * 0.5: | |
| return { | |
| "status": "variation", | |
| "message": f"Found variation of '{object_name}'", | |
| "piclet": variation, | |
| "canonicalId": piclet_data["canonical"]["typeId"] | |
| } | |
| # No variation found, suggest creating one | |
| return { | |
| "status": "new_variation", | |
| "message": f"No variation found for '{object_name}' with attributes {attributes}", | |
| "canonicalId": piclet_data["canonical"]["typeId"], | |
| "piclet": None | |
| } | |
| def create_canonical(object_name: str, piclet_data: str, token_or_username: str) -> dict: | |
| """ | |
| Create a new canonical Piclet | |
| Args: | |
| object_name: The normalized object name (e.g., "pillow") | |
| piclet_data: JSON string of Piclet instance data | |
| token_or_username: Either OAuth token (starts with "hf_") or username for testing | |
| Returns: | |
| Dict with success status and piclet data | |
| """ | |
| try: | |
| piclet_json = json.loads(piclet_data) if isinstance(piclet_data, str) else piclet_data | |
| # Determine if this is a token or username | |
| user_info = None | |
| if token_or_username and token_or_username.startswith('hf_'): | |
| # OAuth token - verify it | |
| user_info = verify_hf_token(token_or_username) | |
| if not user_info: | |
| return { | |
| "success": False, | |
| "error": "Invalid OAuth token" | |
| } | |
| else: | |
| # Legacy username mode (for testing) | |
| user_info = { | |
| "sub": f"legacy_{token_or_username}", | |
| "preferred_username": token_or_username, | |
| "name": token_or_username, | |
| "picture": None | |
| } | |
| # Get or create user profile | |
| user_profile = PicletDiscoveryService.get_or_create_user_profile(user_info) | |
| # Create canonical entry with full discoverer info | |
| canonical_data = { | |
| "canonical": { | |
| "objectName": object_name, | |
| "typeId": f"{PicletDiscoveryService.normalize_object_name(object_name)}_canonical", | |
| "discoveredBy": user_info['preferred_username'], | |
| "discovererSub": user_info['sub'], | |
| "discovererUsername": user_info['preferred_username'], | |
| "discovererName": user_info.get('name'), | |
| "discovererPicture": user_info.get('picture'), | |
| "discoveredAt": datetime.now().isoformat(), | |
| "scanCount": 1, | |
| "picletData": piclet_json | |
| }, | |
| "variations": [] | |
| } | |
| # Save to dataset | |
| if PicletDiscoveryService.save_piclet_data(object_name, canonical_data): | |
| # Update user profile | |
| user_profile["discoveries"].append(canonical_data["canonical"]["typeId"]) | |
| user_profile["uniqueFinds"] += 1 | |
| user_profile["totalFinds"] += 1 | |
| user_profile["rarityScore"] += 100 # Bonus for canonical discovery | |
| PicletDiscoveryService.save_user_data(user_info['sub'], user_profile) | |
| return { | |
| "success": True, | |
| "message": f"Created canonical Piclet for '{object_name}'", | |
| "piclet": canonical_data["canonical"] | |
| } | |
| else: | |
| return { | |
| "success": False, | |
| "error": "Failed to save canonical Piclet" | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": str(e) | |
| } | |
| def create_variation(canonical_id: str, attributes: List[str], piclet_data: str, token_or_username: str, object_name: str) -> dict: | |
| """ | |
| Create a variation of an existing canonical Piclet with OAuth verification | |
| Args: | |
| canonical_id: ID of the canonical Piclet | |
| attributes: List of variation attributes | |
| piclet_data: JSON data for the Piclet | |
| token_or_username: Either OAuth token (starts with "hf_") or username for testing | |
| object_name: Normalized object name | |
| Returns: | |
| Success/error dict with variation data | |
| """ | |
| try: | |
| piclet_json = json.loads(piclet_data) if isinstance(piclet_data, str) else piclet_data | |
| # Verify token or use legacy mode | |
| user_info = None | |
| if token_or_username and token_or_username.startswith('hf_'): | |
| user_info = verify_hf_token(token_or_username) | |
| if not user_info: | |
| return {"success": False, "error": "Invalid OAuth token"} | |
| else: | |
| # Legacy mode for testing | |
| user_info = { | |
| "sub": f"legacy_{token_or_username}", | |
| "preferred_username": token_or_username, | |
| "name": token_or_username, | |
| "picture": None | |
| } | |
| # Get or create user profile | |
| user_profile = PicletDiscoveryService.get_or_create_user_profile(user_info) | |
| # Load existing data | |
| existing_data = PicletDiscoveryService.load_piclet_data(object_name) | |
| if not existing_data: | |
| return { | |
| "success": False, | |
| "error": f"Canonical Piclet not found for '{object_name}'" | |
| } | |
| # Create variation entry | |
| variation_id = f"{PicletDiscoveryService.normalize_object_name(object_name)}_{len(existing_data['variations']) + 1:03d}" | |
| variation = { | |
| "typeId": variation_id, | |
| "attributes": attributes, | |
| "discoveredBy": user_info['preferred_username'], | |
| "discovererSub": user_info['sub'], | |
| "discovererUsername": user_info['preferred_username'], | |
| "discovererName": user_info.get('name'), | |
| "discovererPicture": user_info.get('picture'), | |
| "discoveredAt": datetime.now().isoformat(), | |
| "scanCount": 1, | |
| "picletData": piclet_json | |
| } | |
| # Add to variations | |
| existing_data["variations"].append(variation) | |
| # Save updated data | |
| if PicletDiscoveryService.save_piclet_data(object_name, existing_data): | |
| # Update user profile | |
| user_profile["discoveries"].append(variation_id) | |
| user_profile["totalFinds"] += 1 | |
| user_profile["rarityScore"] += 50 # Bonus for variation discovery | |
| PicletDiscoveryService.save_user_data(user_info['sub'], user_profile) | |
| return { | |
| "success": True, | |
| "message": f"Created variation of '{object_name}'", | |
| "piclet": variation | |
| } | |
| else: | |
| return { | |
| "success": False, | |
| "error": "Failed to save variation" | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": str(e) | |
| } | |
| def increment_scan_count(piclet_id: str, object_name: str) -> dict: | |
| """ | |
| Increment the scan count for a Piclet | |
| """ | |
| try: | |
| data = PicletDiscoveryService.load_piclet_data(object_name) | |
| if not data: | |
| return { | |
| "success": False, | |
| "error": "Piclet not found" | |
| } | |
| # Check canonical | |
| if data["canonical"]["typeId"] == piclet_id: | |
| data["canonical"]["scanCount"] = data["canonical"].get("scanCount", 0) + 1 | |
| scan_count = data["canonical"]["scanCount"] | |
| else: | |
| # Check variations | |
| for variation in data["variations"]: | |
| if variation["typeId"] == piclet_id: | |
| variation["scanCount"] = variation.get("scanCount", 0) + 1 | |
| scan_count = variation["scanCount"] | |
| break | |
| else: | |
| return { | |
| "success": False, | |
| "error": "Piclet ID not found" | |
| } | |
| # Save updated data | |
| if PicletDiscoveryService.save_piclet_data(object_name, data): | |
| return { | |
| "success": True, | |
| "scanCount": scan_count | |
| } | |
| else: | |
| return { | |
| "success": False, | |
| "error": "Failed to update scan count" | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": str(e) | |
| } | |
| def generate_piclet(image, hf_token: str) -> dict: | |
| """ | |
| Complete Piclet generation workflow - single endpoint | |
| Takes user's image and hf_token, returns generated Piclet with discovery status | |
| Args: | |
| image: Uploaded image file (Gradio file input) | |
| hf_token: User's HuggingFace OAuth token | |
| Returns: | |
| { | |
| "success": bool, | |
| "piclet": {complete piclet data}, | |
| "discoveryStatus": "new" | "variation" | "existing", | |
| "canonicalId": str (if variation/existing), | |
| "message": str | |
| } | |
| """ | |
| try: | |
| # Validate token and get user info | |
| user_info = verify_hf_token(hf_token) | |
| if not user_info: | |
| return { | |
| "success": False, | |
| "error": "Invalid HuggingFace token" | |
| } | |
| print(f"Generating Piclet for user: {user_info.get('preferred_username', 'unknown')}") | |
| # Get user profile (creates if doesn't exist) | |
| user_profile = PicletDiscoveryService.get_or_create_user_profile(user_info) | |
| # Get image path from Gradio (type="filepath" gives us a string path) | |
| image_path = image if isinstance(image, str) else str(image) | |
| # Step 1: Generate caption | |
| print("Step 1/5: Generating image caption...") | |
| caption = PicletGeneratorService.generate_enhanced_caption(image_path, hf_token) | |
| # Step 2: Generate concept | |
| print("Step 2/5: Generating Piclet concept...") | |
| concept_data = PicletGeneratorService.generate_piclet_concept(caption, hf_token) | |
| object_name = concept_data['objectName'] | |
| attributes = concept_data['attributes'] | |
| stats = concept_data['stats'] | |
| image_prompt = concept_data['imagePrompt'] | |
| concept_text = concept_data['concept'] | |
| # Step 3: Generate image | |
| print("Step 3/5: Generating Piclet image...") | |
| image_result = PicletGeneratorService.generate_piclet_image( | |
| image_prompt, | |
| stats['tier'], | |
| hf_token | |
| ) | |
| # Step 3.5: Remove background from generated image | |
| print("Step 3.5/5: Processing image (removing background)...") | |
| image_local_path = image_result['imageUrl'] | |
| # Handle both local paths and URLs | |
| if image_local_path.startswith('http'): | |
| # If it's a URL, download it first | |
| import requests | |
| response = requests.get(image_local_path) | |
| with tempfile.NamedTemporaryFile(mode='wb', suffix='.png', delete=False) as f: | |
| f.write(response.content) | |
| image_local_path = f.name | |
| # Remove background | |
| transparent_image_base64 = PicletGeneratorService.remove_background(image_local_path) | |
| # Store the base64 data in image_result for later use | |
| image_result['imageData'] = transparent_image_base64 | |
| # Step 4: Check for canonical/variation | |
| print("Step 4/5: Checking for existing canonical...") | |
| existing_data = PicletDiscoveryService.load_piclet_data(object_name) | |
| discovery_status = 'new' | |
| canonical_id = None | |
| scan_count = 1 | |
| if existing_data: | |
| # Check if this is an exact canonical match (no attributes) | |
| if not attributes or len(attributes) == 0: | |
| discovery_status = 'existing' | |
| canonical_id = existing_data['canonical']['typeId'] | |
| # Increment scan count | |
| existing_data['canonical']['scanCount'] = existing_data['canonical'].get('scanCount', 0) + 1 | |
| scan_count = existing_data['canonical']['scanCount'] | |
| PicletDiscoveryService.save_piclet_data(object_name, existing_data) | |
| else: | |
| # Check for matching variation | |
| variations = existing_data.get('variations', []) | |
| matched_variation = None | |
| for variation in variations: | |
| var_attrs = set(variation.get('attributes', [])) | |
| search_attrs = set(attributes) | |
| overlap = len(var_attrs.intersection(search_attrs)) | |
| if overlap >= len(search_attrs) * 0.5: | |
| matched_variation = variation | |
| discovery_status = 'existing' | |
| canonical_id = existing_data['canonical']['typeId'] | |
| # Increment variation scan count | |
| variation['scanCount'] = variation.get('scanCount', 0) + 1 | |
| scan_count = variation['scanCount'] | |
| PicletDiscoveryService.save_piclet_data(object_name, existing_data) | |
| break | |
| if not matched_variation: | |
| discovery_status = 'variation' | |
| canonical_id = existing_data['canonical']['typeId'] | |
| # Step 5: Save new discovery if needed | |
| print("Step 5/5: Saving to dataset...") | |
| if discovery_status == 'new': | |
| # Create new canonical | |
| type_id = f"{PicletDiscoveryService.normalize_object_name(object_name)}_canonical" | |
| # Upload image to dataset with canonical filename | |
| normalized_name = PicletDiscoveryService.normalize_object_name(object_name) | |
| image_filename = f"{normalized_name}_canonical.png" | |
| dataset_image_url = PicletGeneratorService.upload_image_to_dataset( | |
| image_result['imageData'], | |
| image_filename | |
| ) | |
| canonical_data = { | |
| "canonical": { | |
| "objectName": object_name, | |
| "typeId": type_id, | |
| "discoveredBy": user_info['preferred_username'], | |
| "discovererSub": user_info['sub'], | |
| "discovererUsername": user_info['preferred_username'], | |
| "discovererName": user_info.get('name'), | |
| "discovererPicture": user_info.get('picture'), | |
| "discoveredAt": datetime.now().isoformat(), | |
| "scanCount": scan_count, | |
| "picletData": { | |
| "typeId": type_id, | |
| "nickname": stats['name'], | |
| "stats": stats, | |
| "imageUrl": dataset_image_url, | |
| "imageData": image_result['imageData'], | |
| "imageCaption": caption, | |
| "concept": concept_text, | |
| "imagePrompt": image_prompt, | |
| "createdAt": datetime.now().isoformat() | |
| } | |
| }, | |
| "variations": [] | |
| } | |
| canonical_id = type_id | |
| PicletDiscoveryService.save_piclet_data(object_name, canonical_data) | |
| # Update user profile | |
| user_profile["discoveries"].append(type_id) | |
| user_profile["uniqueFinds"] = user_profile.get("uniqueFinds", 0) + 1 | |
| user_profile["totalFinds"] = user_profile.get("totalFinds", 0) + 1 | |
| user_profile["rarityScore"] = user_profile.get("rarityScore", 0) + 100 | |
| PicletDiscoveryService.save_user_data(user_info['sub'], user_profile) | |
| elif discovery_status == 'variation': | |
| # Create new variation | |
| existing_data = PicletDiscoveryService.load_piclet_data(object_name) | |
| variation_num = len(existing_data['variations']) + 1 | |
| normalized_name = PicletDiscoveryService.normalize_object_name(object_name) | |
| variation_id = f"{normalized_name}_{variation_num:03d}" | |
| # Upload image to dataset with variation filename | |
| image_filename = f"{normalized_name}_{variation_num:03d}.png" | |
| dataset_image_url = PicletGeneratorService.upload_image_to_dataset( | |
| image_result['imageData'], | |
| image_filename | |
| ) | |
| variation_data = { | |
| "typeId": variation_id, | |
| "attributes": attributes, | |
| "discoveredBy": user_info['preferred_username'], | |
| "discovererSub": user_info['sub'], | |
| "discovererUsername": user_info['preferred_username'], | |
| "discovererName": user_info.get('name'), | |
| "discovererPicture": user_info.get('picture'), | |
| "discoveredAt": datetime.now().isoformat(), | |
| "scanCount": scan_count, | |
| "picletData": { | |
| "typeId": variation_id, | |
| "nickname": stats['name'], | |
| "stats": stats, | |
| "imageUrl": dataset_image_url, | |
| "imageData": image_result['imageData'], | |
| "imageCaption": caption, | |
| "concept": concept_text, | |
| "imagePrompt": image_prompt, | |
| "createdAt": datetime.now().isoformat() | |
| } | |
| } | |
| existing_data['variations'].append(variation_data) | |
| PicletDiscoveryService.save_piclet_data(object_name, existing_data) | |
| # Update user profile | |
| user_profile["discoveries"].append(variation_id) | |
| user_profile["totalFinds"] = user_profile.get("totalFinds", 0) + 1 | |
| user_profile["rarityScore"] = user_profile.get("rarityScore", 0) + 50 | |
| PicletDiscoveryService.save_user_data(user_info['sub'], user_profile) | |
| # Build complete response | |
| # For existing piclets, get the stored data; for new/variation, use generated data | |
| if discovery_status == 'existing': | |
| # Load the existing piclet data to return | |
| existing_piclet_data = PicletDiscoveryService.load_piclet_data(object_name) | |
| if existing_piclet_data and existing_piclet_data.get('canonical'): | |
| existing_canonical = existing_piclet_data['canonical'] | |
| piclet_data = existing_canonical.get('picletData', {}) | |
| piclet_data['discoveryStatus'] = discovery_status | |
| piclet_data['scanCount'] = existing_canonical.get('scanCount', 1) | |
| else: | |
| # Fallback if data not found | |
| piclet_data = { | |
| "typeId": canonical_id, | |
| "nickname": stats['name'], | |
| "stats": stats, | |
| "imageUrl": image_result.get('imageUrl', ''), | |
| "imageData": image_result.get('imageData', ''), | |
| "imageCaption": caption, | |
| "concept": concept_text, | |
| "imagePrompt": image_prompt, | |
| "objectName": object_name, | |
| "attributes": attributes, | |
| "discoveryStatus": discovery_status, | |
| "scanCount": scan_count, | |
| "createdAt": datetime.now().isoformat() | |
| } | |
| else: | |
| # For new and variation, determine the correct dataset URL | |
| if discovery_status == 'new': | |
| normalized_name = PicletDiscoveryService.normalize_object_name(object_name) | |
| image_filename = f"{normalized_name}_canonical.png" | |
| else: # variation | |
| normalized_name = PicletDiscoveryService.normalize_object_name(object_name) | |
| existing_data = PicletDiscoveryService.load_piclet_data(object_name) | |
| variation_num = len(existing_data.get('variations', [])) | |
| image_filename = f"{normalized_name}_{variation_num:03d}.png" | |
| dataset_image_url = f"https://huggingface.co/datasets/{DATASET_REPO}/resolve/main/images/{image_filename}" | |
| piclet_data = { | |
| "typeId": canonical_id, | |
| "nickname": stats['name'], | |
| "stats": stats, | |
| "imageUrl": dataset_image_url, | |
| "imageData": image_result.get('imageData', ''), | |
| "imageCaption": caption, | |
| "concept": concept_text, | |
| "imagePrompt": image_prompt, | |
| "objectName": object_name, | |
| "attributes": attributes, | |
| "discoveryStatus": discovery_status, | |
| "scanCount": scan_count, | |
| "createdAt": datetime.now().isoformat() | |
| } | |
| messages = { | |
| 'new': f"Congratulations! You discovered the first {object_name} Piclet!", | |
| 'variation': f"You found a new variation of {object_name}!", | |
| 'existing': f"You encountered a known {object_name} Piclet." | |
| } | |
| return { | |
| "success": True, | |
| "piclet": piclet_data, | |
| "discoveryStatus": discovery_status, | |
| "canonicalId": canonical_id, | |
| "message": messages.get(discovery_status, "Piclet generated!") | |
| } | |
| except Exception as e: | |
| print(f"Failed to generate Piclet: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return { | |
| "success": False, | |
| "error": str(e) | |
| } | |
| def get_object_details(object_name: str) -> dict: | |
| """ | |
| Get complete details for an object (canonical + all variations) | |
| Args: | |
| object_name: The object name (e.g., "pillow", "macbook") | |
| Returns: | |
| { | |
| "success": bool, | |
| "objectName": str, | |
| "canonical": {canonical data}, | |
| "variations": [list of variations], | |
| "totalScans": int | |
| } | |
| """ | |
| try: | |
| # Load the object data | |
| piclet_data = PicletDiscoveryService.load_piclet_data(object_name) | |
| if not piclet_data: | |
| return { | |
| "success": False, | |
| "error": f"No piclet found for object '{object_name}'", | |
| "objectName": object_name | |
| } | |
| # Calculate total scans across canonical and variations | |
| total_scans = piclet_data['canonical'].get('scanCount', 0) | |
| for variation in piclet_data.get('variations', []): | |
| total_scans += variation.get('scanCount', 0) | |
| return { | |
| "success": True, | |
| "objectName": object_name, | |
| "canonical": piclet_data['canonical'], | |
| "variations": piclet_data.get('variations', []), | |
| "totalScans": total_scans, | |
| "variationCount": len(piclet_data.get('variations', [])) | |
| } | |
| except Exception as e: | |
| print(f"Failed to get object details: {e}") | |
| return { | |
| "success": False, | |
| "error": str(e), | |
| "objectName": object_name | |
| } | |
| def get_user_piclets(hf_token: str) -> dict: | |
| """ | |
| Get all Piclets discovered by a specific user | |
| Args: | |
| hf_token: User's HuggingFace OAuth token | |
| Returns: | |
| { | |
| "success": bool, | |
| "piclets": [list of piclet discoveries], | |
| "stats": {user stats} | |
| } | |
| """ | |
| try: | |
| # Verify token and get user info | |
| user_info = verify_hf_token(hf_token) | |
| if not user_info: | |
| return { | |
| "success": False, | |
| "error": "Invalid HuggingFace token", | |
| "piclets": [] | |
| } | |
| # Load user profile | |
| user_profile = PicletDiscoveryService.load_user_data(user_info['sub']) | |
| # Get list of discoveries | |
| discoveries = user_profile.get('discoveries', []) | |
| piclets = [] | |
| # Load each discovered piclet | |
| for type_id in discoveries: | |
| # Extract object name from type_id (e.g., "pillow_canonical" -> "pillow") | |
| object_name = type_id.rsplit('_', 1)[0] | |
| # Load the piclet data | |
| piclet_data = PicletDiscoveryService.load_piclet_data(object_name) | |
| if piclet_data: | |
| # Check if it's canonical or variation | |
| if piclet_data['canonical']['typeId'] == type_id: | |
| piclets.append({ | |
| 'type': 'canonical', | |
| 'typeId': type_id, | |
| 'objectName': object_name, | |
| 'discoveredAt': piclet_data['canonical']['discoveredAt'], | |
| 'scanCount': piclet_data['canonical'].get('scanCount', 1), | |
| 'picletData': piclet_data['canonical'].get('picletData', {}) | |
| }) | |
| else: | |
| # Find matching variation | |
| for variation in piclet_data.get('variations', []): | |
| if variation['typeId'] == type_id: | |
| piclets.append({ | |
| 'type': 'variation', | |
| 'typeId': type_id, | |
| 'objectName': object_name, | |
| 'attributes': variation.get('attributes', []), | |
| 'discoveredAt': variation['discoveredAt'], | |
| 'scanCount': variation.get('scanCount', 1), | |
| 'picletData': variation.get('picletData', {}) | |
| }) | |
| break | |
| # Sort by discovery date (most recent first) | |
| piclets.sort(key=lambda x: x.get('discoveredAt', ''), reverse=True) | |
| return { | |
| "success": True, | |
| "piclets": piclets, | |
| "stats": { | |
| "username": user_info.get('preferred_username'), | |
| "name": user_info.get('name'), | |
| "picture": user_info.get('picture'), | |
| "totalFinds": user_profile.get('totalFinds', 0), | |
| "uniqueFinds": user_profile.get('uniqueFinds', 0), | |
| "rarityScore": user_profile.get('rarityScore', 0), | |
| "joinedAt": user_profile.get('joinedAt') | |
| } | |
| } | |
| except Exception as e: | |
| print(f"Failed to get user piclets: {e}") | |
| return { | |
| "success": False, | |
| "error": str(e), | |
| "piclets": [] | |
| } | |
| def get_recent_activity(limit: int = 20) -> dict: | |
| """ | |
| Get recent discoveries across all users | |
| """ | |
| try: | |
| activities = [] | |
| # List all piclet files | |
| try: | |
| files = list_repo_files( | |
| repo_id=DATASET_REPO, | |
| repo_type=DATASET_TYPE, | |
| token=HF_TOKEN | |
| ) | |
| piclet_files = [f for f in files if f.startswith("piclets/") and f.endswith(".json")] | |
| except: | |
| piclet_files = [] | |
| # Load recent piclets (simplified - in production, maintain a separate activity log) | |
| for file_path in piclet_files[-limit:]: | |
| try: | |
| object_name = file_path.replace("piclets/", "").replace(".json", "") | |
| data = PicletDiscoveryService.load_piclet_data(object_name) | |
| if data: | |
| # Add canonical discovery | |
| canonical = data["canonical"] | |
| activities.append({ | |
| "type": "discovery", | |
| "objectName": object_name, | |
| "typeId": canonical["typeId"], | |
| "discoveredBy": canonical["discoveredBy"], | |
| "discoveredAt": canonical["discoveredAt"], | |
| "scanCount": canonical.get("scanCount", 1) | |
| }) | |
| # Add recent variations | |
| for variation in data.get("variations", [])[-5:]: | |
| activities.append({ | |
| "type": "variation", | |
| "objectName": object_name, | |
| "typeId": variation["typeId"], | |
| "attributes": variation["attributes"], | |
| "discoveredBy": variation["discoveredBy"], | |
| "discoveredAt": variation["discoveredAt"], | |
| "scanCount": variation.get("scanCount", 1) | |
| }) | |
| except: | |
| continue | |
| # Sort by discovery date | |
| activities.sort(key=lambda x: x.get("discoveredAt", ""), reverse=True) | |
| return { | |
| "success": True, | |
| "activities": activities[:limit] | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": str(e), | |
| "activities": [] | |
| } | |
| def get_leaderboard(limit: int = 10) -> dict: | |
| """ | |
| Get top discoverers | |
| """ | |
| try: | |
| leaderboard = [] | |
| # List all user files | |
| try: | |
| files = list_repo_files( | |
| repo_id=DATASET_REPO, | |
| repo_type=DATASET_TYPE, | |
| token=HF_TOKEN | |
| ) | |
| user_files = [f for f in files if f.startswith("users/") and f.endswith(".json")] | |
| except: | |
| user_files = [] | |
| # Load user data | |
| for file_path in user_files: | |
| try: | |
| username = file_path.replace("users/", "").replace(".json", "") | |
| user_data = PicletDiscoveryService.load_user_data(username) | |
| leaderboard.append({ | |
| "username": username, | |
| "totalFinds": user_data.get("totalFinds", 0), | |
| "uniqueFinds": user_data.get("uniqueFinds", 0), | |
| "rarityScore": user_data.get("rarityScore", 0) | |
| }) | |
| except: | |
| continue | |
| # Sort by rarity score | |
| leaderboard.sort(key=lambda x: x["rarityScore"], reverse=True) | |
| # Add ranks | |
| for i, entry in enumerate(leaderboard[:limit]): | |
| entry["rank"] = i + 1 | |
| return { | |
| "success": True, | |
| "leaderboard": leaderboard[:limit] | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": str(e), | |
| "leaderboard": [] | |
| } | |
| # Create Gradio interface | |
| with gr.Blocks(title="Piclets Discovery Server") as app: | |
| gr.Markdown(""" | |
| # ๐ Piclets Discovery Server | |
| Backend service for the Piclets discovery game. Each real-world object has ONE canonical Piclet! | |
| """) | |
| with gr.Tab("Generate Piclet"): | |
| gr.Markdown(""" | |
| ## ๐ฎ Complete Piclet Generator | |
| Upload an image and provide your HuggingFace token to generate a complete Piclet. | |
| This endpoint handles the entire workflow: captioning, concept generation, image creation, and dataset storage. | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| gen_image = gr.Image(label="Upload Image", type="filepath") | |
| gen_token = gr.Textbox(label="HuggingFace Token", placeholder="hf_...", type="password") | |
| gen_btn = gr.Button("Generate Piclet", variant="primary") | |
| with gr.Column(): | |
| gen_result = gr.JSON(label="Generated Piclet Result") | |
| gen_btn.click( | |
| fn=generate_piclet, | |
| inputs=[gen_image, gen_token], | |
| outputs=gen_result | |
| ) | |
| with gr.Tab("My Piclets"): | |
| gr.Markdown(""" | |
| ## ๐ Your Discovery Collection | |
| View all Piclets you've discovered (includes your stats). | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| my_token = gr.Textbox(label="HuggingFace Token", placeholder="hf_...", type="password") | |
| my_btn = gr.Button("Get My Piclets", variant="primary") | |
| with gr.Column(): | |
| my_result = gr.JSON(label="My Piclets") | |
| my_btn.click( | |
| fn=get_user_piclets, | |
| inputs=my_token, | |
| outputs=my_result | |
| ) | |
| with gr.Tab("Object Details"): | |
| gr.Markdown(""" | |
| ## ๐ View Object Details | |
| Get complete information about an object (canonical + all variations). | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| obj_name = gr.Textbox(label="Object Name", placeholder="e.g., pillow, macbook") | |
| obj_btn = gr.Button("Get Details", variant="primary") | |
| with gr.Column(): | |
| obj_result = gr.JSON(label="Object Details") | |
| obj_btn.click( | |
| fn=get_object_details, | |
| inputs=obj_name, | |
| outputs=obj_result | |
| ) | |
| with gr.Tab("Recent Activity"): | |
| activity_limit = gr.Slider(5, 50, value=20, label="Number of Activities") | |
| activity_btn = gr.Button("Get Recent Activity") | |
| activity_result = gr.JSON(label="Recent Discoveries") | |
| activity_btn.click( | |
| fn=get_recent_activity, | |
| inputs=activity_limit, | |
| outputs=activity_result | |
| ) | |
| with gr.Tab("Leaderboard"): | |
| leader_limit = gr.Slider(5, 20, value=10, label="Top N Discoverers") | |
| leader_btn = gr.Button("Get Leaderboard") | |
| leader_result = gr.JSON(label="Top Discoverers") | |
| leader_btn.click( | |
| fn=get_leaderboard, | |
| inputs=leader_limit, | |
| outputs=leader_result | |
| ) | |
| # API Documentation | |
| gr.Markdown(""" | |
| ## ๐ Public API Endpoints | |
| All endpoints return JSON responses. The frontend only needs these 5 endpoints: | |
| ### 1. **generate_piclet** (Scanner) | |
| Complete Piclet generation workflow. | |
| - Input: `image` (File), `hf_token` (string) | |
| - Output: Generated Piclet with discovery status | |
| ### 2. **get_user_piclets** (User Collection) | |
| Get user's discovered Piclets and stats. | |
| - Input: `hf_token` (string) | |
| - Output: List of Piclets + user stats (total/unique finds, rarity score) | |
| ### 3. **get_object_details** (Object Data) | |
| Get complete object info (canonical + all variations). | |
| - Input: `object_name` (string) | |
| - Output: Canonical + variations + total scans | |
| ### 4. **get_recent_activity** (Activity Feed) | |
| Recent discoveries across all users. | |
| - Input: `limit` (int, default 20) | |
| - Output: Recent discoveries with timestamps | |
| ### 5. **get_leaderboard** (Top Users) | |
| Top discoverers by rarity score. | |
| - Input: `limit` (int, default 10) | |
| - Output: Ranked users with stats | |
| --- | |
| *Note: Internal helper functions (search_piclet, create_canonical, etc.) are used by generate_piclet but not exposed to frontend.* | |
| """) | |
| if __name__ == "__main__": | |
| # Protect web UI with authentication while allowing API access | |
| admin_password = os.getenv("ADMIN_PASSWORD", "changeme") | |
| # Configure for HuggingFace Space environment | |
| app.launch() | |