import gradio as gr from gradio_client import Client, handle_file import json import os import re from datetime import datetime from typing import List, Optional from huggingface_hub import HfApi, hf_hub_download, list_repo_files from pathlib import Path import tempfile from auth import verify_hf_token # HuggingFace configuration HF_TOKEN = os.getenv("HF_TOKEN") # Required for writing to dataset DATASET_REPO = "Fraser/piclets" # Public dataset repository DATASET_TYPE = "dataset" # Initialize HuggingFace API with token if available api = HfApi(token=HF_TOKEN) if HF_TOKEN else HfApi() # Cache directory for local operations CACHE_DIR = Path("cache") CACHE_DIR.mkdir(exist_ok=True) class PicletDiscoveryService: """Manages Piclet discovery using HuggingFace datasets""" @staticmethod def normalize_object_name(name: str) -> str: """ Normalize object names for consistent storage and lookup Examples: "The Blue Pillow" -> "pillow", "wooden chairs" -> "wooden_chair" """ if not name: return "unknown" # Convert to lowercase and strip name = name.lower().strip() # Remove articles (the, a, an) name = re.sub(r'^(the|a|an)\s+', '', name) # Remove special characters except spaces name = re.sub(r'[^a-z0-9\s]', '', name) # Handle common plurals (basic pluralization rules) if name.endswith('ies') and len(name) > 4: name = name[:-3] + 'y' # berries -> berry elif name.endswith('ves') and len(name) > 4: name = name[:-3] + 'f' # leaves -> leaf elif name.endswith('es') and len(name) > 3: # Check if it's a special case like "glasses" if not name.endswith(('ses', 'xes', 'zes', 'ches', 'shes')): name = name[:-2] # boxes -> box (but keep glasses) elif name.endswith('s') and len(name) > 2 and not name.endswith('ss'): name = name[:-1] # chairs -> chair (but keep glass) # Replace spaces with underscores name = re.sub(r'\s+', '_', name.strip()) return name @staticmethod def load_piclet_data(object_name: str) -> Optional[dict]: """Load Piclet data from HuggingFace dataset""" try: normalized_name = PicletDiscoveryService.normalize_object_name(object_name) file_path = f"piclets/{normalized_name}.json" # Download the file from HuggingFace local_path = hf_hub_download( repo_id=DATASET_REPO, filename=file_path, repo_type=DATASET_TYPE, token=HF_TOKEN, cache_dir=str(CACHE_DIR) ) with open(local_path, 'r') as f: return json.load(f) except Exception as e: print(f"Could not load piclet data for {object_name}: {e}") return None @staticmethod def save_piclet_data(object_name: str, data: dict) -> bool: """Save Piclet data to HuggingFace dataset""" try: normalized_name = PicletDiscoveryService.normalize_object_name(object_name) file_path = f"piclets/{normalized_name}.json" # Create a temporary file with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: json.dump(data, f, indent=2) temp_path = f.name # Upload to HuggingFace api.upload_file( path_or_fileobj=temp_path, path_in_repo=file_path, repo_id=DATASET_REPO, repo_type=DATASET_TYPE, commit_message=f"Update piclet: {normalized_name}" ) # Clean up os.unlink(temp_path) return True except Exception as e: print(f"Failed to save piclet data: {e}") return False @staticmethod def load_user_data(sub: str) -> dict: """ Load user profile from dataset by HF user ID (sub) Args: sub: HuggingFace user ID (stable identifier) Returns: User profile dict or default profile if not found """ try: file_path = f"users/{sub}.json" local_path = hf_hub_download( repo_id=DATASET_REPO, filename=file_path, repo_type=DATASET_TYPE, token=HF_TOKEN, cache_dir=str(CACHE_DIR) ) with open(local_path, 'r') as f: return json.load(f) except: # Return default user profile if not found # Will be populated with actual data on first save return { "sub": sub, "preferred_username": None, "name": None, "picture": None, "joinedAt": datetime.now().isoformat(), "lastSeen": datetime.now().isoformat(), "discoveries": [], "uniqueFinds": 0, "totalFinds": 0, "rarityScore": 0, "visibility": "public" } @staticmethod def save_user_data(sub: str, data: dict) -> bool: """ Save user profile to dataset by HF user ID (sub) Args: sub: HuggingFace user ID (stable identifier) data: User profile dict Returns: True if successful, False otherwise """ try: file_path = f"users/{sub}.json" # Update lastSeen timestamp data["lastSeen"] = datetime.now().isoformat() with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: json.dump(data, f, indent=2) temp_path = f.name api.upload_file( path_or_fileobj=temp_path, path_in_repo=file_path, repo_id=DATASET_REPO, repo_type=DATASET_TYPE, commit_message=f"Update user profile: {data.get('preferred_username', sub)}" ) os.unlink(temp_path) return True except Exception as e: print(f"Failed to save user data: {e}") return False @staticmethod def get_or_create_user_profile(user_info: dict) -> dict: """ Get existing user profile or create new one from OAuth user_info Refreshes cached profile data on each call Args: user_info: OAuth user info from HF (sub, preferred_username, name, picture) Returns: User profile dict """ sub = user_info['sub'] # Load existing profile profile = PicletDiscoveryService.load_user_data(sub) # Update cached profile fields from OAuth profile['sub'] = sub profile['preferred_username'] = user_info.get('preferred_username') profile['name'] = user_info.get('name') profile['picture'] = user_info.get('picture') profile['email'] = user_info.get('email') # Set joinedAt only if this is a new profile if 'joinedAt' not in profile or not profile['joinedAt']: profile['joinedAt'] = datetime.now().isoformat() return profile @staticmethod def update_global_stats() -> dict: """Update and return global statistics""" try: # Try to load existing stats try: local_path = hf_hub_download( repo_id=DATASET_REPO, filename="metadata/stats.json", repo_type=DATASET_TYPE, token=HF_TOKEN, cache_dir=str(CACHE_DIR) ) with open(local_path, 'r') as f: stats = json.load(f) except: stats = { "totalDiscoveries": 0, "uniqueObjects": 0, "totalVariations": 0, "lastUpdated": datetime.now().isoformat() } return stats except Exception as e: print(f"Failed to update global stats: {e}") return {} class PicletGeneratorService: """ Orchestrates Piclet generation by calling external AI services Uses user's hf_token to consume their GPU quota """ # Space endpoints JOY_CAPTION_SPACE = "fancyfeast/joy-caption-alpha-two" GPT_OSS_SPACE = "amd/gpt-oss-120b-chatbot" FLUX_SPACE = "black-forest-labs/FLUX.1-schnell" @staticmethod def generate_enhanced_caption(image_path: str, hf_token: str) -> str: """Generate detailed image description using JoyCaption Args: image_path: Path to image file hf_token: User's HuggingFace token """ try: print(f"Connecting to JoyCaption space with user token...") client = Client( PicletGeneratorService.JOY_CAPTION_SPACE, hf_token=hf_token ) print(f"Generating caption for image...") result = client.predict( handle_file(image_path), # Wrap path so client uploads file "Descriptive", # caption_type "medium-length", # caption_length [], # extra_options "", # name_input "Describe this image in detail, identifying any recognizable objects, brands, logos, or specific models. Be specific about product names and types.", # custom_prompt api_name="/stream_chat" ) # JoyCaption returns tuple: (prompt_used, caption_text) in .data result_data = result.data if hasattr(result, 'data') else result caption = result_data[1] if isinstance(result_data, (list, tuple)) and len(result_data) > 1 else str(result_data) print(f"Caption generated: {caption[:100]}...") return caption except Exception as e: print(f"Failed to generate caption: {e}") raise Exception(f"Caption generation failed: {str(e)}") @staticmethod def generate_text_with_gpt(prompt: str, hf_token: str) -> str: """Generate text using GPT-OSS-120B""" try: print(f"Connecting to GPT-OSS space...") client = Client( PicletGeneratorService.GPT_OSS_SPACE, hf_token=hf_token ) print(f"Generating text...") result = client.predict( api_name="/chat", message=prompt, history=[], system_prompt="You are a helpful assistant that creates Pokemon-style monster concepts based on real-world objects.", temperature=0.7 ) # Extract response text (GPT-OSS formats with Analysis and Response) result_data = result.data if hasattr(result, 'data') else result response_text = result_data[0] if isinstance(result_data, (list, tuple)) else str(result_data) # Try to extract Response section response_match = re.search(r'\*\*๐Ÿ’ฌ Response:\*\*\s*\n\n([\s\S]*)', response_text) if response_match: return response_match.group(1).strip() # Fallback: extract after "assistantfinal" final_match = re.search(r'assistantfinal\s*([\s\S]*)', response_text) if final_match: return final_match.group(1).strip() return response_text except Exception as e: print(f"Failed to generate text: {e}") raise Exception(f"Text generation failed: {str(e)}") @staticmethod def generate_piclet_concept(caption: str, hf_token: str) -> dict: """ Generate complete Piclet concept from image caption Returns parsed concept with object name, variation, stats, etc. """ concept_prompt = f"""You are analyzing an image to create a Pokemon-style creature. Here's the image description: "{caption}" Your task: 1. Identify the PRIMARY PHYSICAL OBJECT with SPECIFICITY (e.g., "macbook" not "laptop", "eiffel tower" not "tower", "iphone" not "phone", "starbucks mug" not "mug") 2. Determine if there's a meaningful VARIATION (e.g., "silver", "pro", "night", "gaming", "vintage") 3. Assess rarity based on uniqueness 4. Create a complete Pokemon-style monster concept Format your response EXACTLY as follows: ```md # Canonical Object {{Specific object name: "macbook", "eiffel tower", "iphone", "tesla", "le creuset mug", "nintendo switch"}} {{NOT generic terms like: "laptop", "tower", "phone", "car", "mug", "console"}} {{Include brand/model/landmark name when identifiable}} # Variation {{OPTIONAL: one distinctive attribute like "silver", "pro", "night", "gaming", OR use "canonical" if this is the standard/default version with no special variation}} # Object Rarity {{common, uncommon, rare, epic, or legendary based on object uniqueness}} # Monster Name {{Creative 8-11 letter name based on the SPECIFIC object, e.g., "Macbyte" for MacBook, "Towerfell" for Eiffel Tower}} # Primary Type {{beast, bug, aquatic, flora, mineral, space, machina, structure, culture, or cuisine}} # Physical Stats Height: {{e.g., "1.2m" or "3'5\\""}} Weight: {{e.g., "15kg" or "33 lbs"}} # Personality {{1-2 sentences describing personality traits}} # Monster Description {{2-3 paragraphs describing how the SPECIFIC object's features translate into monster features. Reference the actual object by name. This is the creature's bio.}} # Monster Image Prompt {{Concise visual description for anime-style image generation focusing on colors, shapes, and key features inspired by the specific object}} ``` CRITICAL RULES: - Canonical Object MUST be SPECIFIC: "macbook" not "laptop", "big ben" not "clock tower", "coca cola" not "soda" - If you can identify a brand, model, or proper name from the description, USE IT - Variation should be meaningful and distinctive (material, style, color, context, or model variant) - Monster Description must describe the CREATURE with references to the specific object's features - Primary Type must match the object category (machina for electronics, structure for buildings, etc.)""" response_text = PicletGeneratorService.generate_text_with_gpt(concept_prompt, hf_token) # Parse the concept return PicletGeneratorService.parse_concept(response_text) @staticmethod def parse_concept(concept_text: str) -> dict: """Parse structured concept text into dict""" # Remove code block markers if present if '```' in concept_text: code_block_match = re.search(r'```(?:md|markdown)?\s*\n([\s\S]*?)```', concept_text) if code_block_match: concept_text = code_block_match.group(1).strip() def extract_section(text: str, section: str) -> str: """Extract content of a markdown section""" pattern = rf'\*{{0,2}}#\s*{re.escape(section)}\s*\*{{0,2}}\s*\n([\s\S]*?)(?=^\*{{0,2}}#|$)' match = re.search(pattern, text, re.MULTILINE) if match: content = match.group(1).strip() # Remove curly braces and quotes that GPT sometimes adds content = re.sub(r'^[{"]|["}]$', '', content) content = re.sub(r'^.*:\s*["\']|["\']$', '', content) return content.strip() return '' # Extract all sections object_name = extract_section(concept_text, 'Canonical Object').lower() variation_text = extract_section(concept_text, 'Variation') rarity_text = extract_section(concept_text, 'Object Rarity').lower() monster_name = extract_section(concept_text, 'Monster Name') primary_type = extract_section(concept_text, 'Primary Type').lower() description = extract_section(concept_text, 'Monster Description') image_prompt = extract_section(concept_text, 'Monster Image Prompt') # Parse physical stats physical_stats_text = extract_section(concept_text, 'Physical Stats') height_match = re.search(r'Height:\s*(.+)', physical_stats_text, re.IGNORECASE) weight_match = re.search(r'Weight:\s*(.+)', physical_stats_text, re.IGNORECASE) height = height_match.group(1).strip() if height_match else None weight = weight_match.group(1).strip() if weight_match else None personality = extract_section(concept_text, 'Personality') # Clean monster name if monster_name: monster_name = re.sub(r'\*+', '', monster_name) # Remove asterisks if ',' in monster_name: monster_name = monster_name.split(',')[0] if len(monster_name) > 12: monster_name = monster_name[:12] # Parse variation attributes = [] if variation_text and variation_text.lower() not in ['none', 'canonical', '']: attributes = [variation_text.lower()] # Map rarity to tier tier = 'medium' if 'common' in rarity_text: tier = 'low' elif 'uncommon' in rarity_text: tier = 'medium' elif 'rare' in rarity_text and 'epic' not in rarity_text: tier = 'high' elif 'legendary' in rarity_text or 'epic' in rarity_text or 'mythical' in rarity_text: tier = 'legendary' return { 'objectName': object_name, 'attributes': attributes, 'concept': concept_text, 'stats': { 'name': monster_name or 'Unknown', 'description': description, 'tier': tier, 'primaryType': primary_type or 'beast', 'height': height, 'weight': weight, 'personality': personality }, 'imagePrompt': image_prompt } @staticmethod def generate_piclet_image(image_prompt: str, tier: str, hf_token: str) -> dict: """Generate Piclet image using Flux""" try: print(f"Connecting to Flux space...") client = Client( PicletGeneratorService.FLUX_SPACE, hf_token=hf_token ) tier_descriptions = { 'low': 'simple and iconic design', 'medium': 'detailed and well-crafted design', 'high': 'highly detailed and impressive design with special effects', 'legendary': 'highly detailed and majestic design with dramatic lighting and aura effects' } background_styles = { 'low': 'simple natural environment', 'medium': 'detailed thematic natural environment', 'high': 'atmospheric thematic environment with special lighting effects', 'legendary': 'epic thematic environment with dramatic atmospheric lighting and mystical aura' } full_prompt = f"{image_prompt}\nNow generate a Pokรฉmon Anime image of the monster in an idle pose with a {background_styles.get(tier, background_styles['medium'])} background that complements the creature's type and characteristics. This is a {tier} tier monster with a {tier_descriptions.get(tier, tier_descriptions['medium'])}. The monster should not be attacking or in motion. The full monster must be visible and centered in the frame." print(f"Generating image with prompt: {full_prompt[:100]}...") result = client.predict( full_prompt, # prompt 0, # seed True, # randomize_seed 1024, # width 1024, # height 4, # num_inference_steps api_name="/infer" ) # Extract image URL and seed result_data = result.data if hasattr(result, 'data') else result image_data = result_data[0] if isinstance(result_data, (list, tuple)) else result_data seed = result_data[1] if isinstance(result_data, (list, tuple)) and len(result_data) > 1 else 0 # Handle different return formats image_url = None if isinstance(image_data, str): image_url = image_data elif isinstance(image_data, dict): image_url = image_data.get('url') or image_data.get('path') if not image_url: raise Exception("Failed to extract image URL from Flux response") return { 'imageUrl': image_url, 'seed': seed, 'prompt': image_prompt } except Exception as e: print(f"Failed to generate image: {e}") raise Exception(f"Image generation failed: {str(e)}") @staticmethod def upload_image_to_dataset(image_path: str, file_name: str) -> str: """ Upload image to HuggingFace dataset Args: image_path: Local path to the image file (or URL to download from) file_name: Name for the file (e.g., "pillow_canonical.png") Returns: URL to the uploaded image in the dataset """ try: print(f"Uploading image to dataset: {file_name}") # Handle both local paths and URLs if image_path.startswith('http'): # Download from URL first import requests response = requests.get(image_path) with tempfile.NamedTemporaryFile(mode='wb', suffix='.png', delete=False) as f: f.write(response.content) temp_path = f.name else: # Use local path directly temp_path = image_path # Upload to HuggingFace dataset file_path = f"images/{file_name}" api.upload_file( path_or_fileobj=temp_path, path_in_repo=file_path, repo_id=DATASET_REPO, repo_type=DATASET_TYPE, commit_message=f"Add piclet image: {file_name}" ) # Clean up temp file if we downloaded it if image_path.startswith('http'): os.unlink(temp_path) # Return the dataset URL dataset_url = f"https://huggingface.co/datasets/{DATASET_REPO}/resolve/main/{file_path}" print(f"Image uploaded successfully: {dataset_url}") return dataset_url except Exception as e: print(f"Failed to upload image: {e}") raise Exception(f"Image upload failed: {str(e)}") # API Endpoints def search_piclet(object_name: str, attributes: List[str]) -> dict: """ Search for canonical Piclet or variations Returns matching piclet or None """ piclet_data = PicletDiscoveryService.load_piclet_data(object_name) if not piclet_data: return { "status": "new", "message": f"No Piclet found for '{object_name}'", "piclet": None } # Check if searching for canonical (no attributes) if not attributes or len(attributes) == 0: return { "status": "existing", "message": f"Found canonical Piclet for '{object_name}'", "piclet": piclet_data.get("canonical") } # Search for matching variation variations = piclet_data.get("variations", []) for variation in variations: var_attrs = set(variation.get("attributes", [])) search_attrs = set(attributes) # Check for close match (at least 50% overlap) overlap = len(var_attrs.intersection(search_attrs)) if overlap >= len(search_attrs) * 0.5: return { "status": "variation", "message": f"Found variation of '{object_name}'", "piclet": variation, "canonicalId": piclet_data["canonical"]["typeId"] } # No variation found, suggest creating one return { "status": "new_variation", "message": f"No variation found for '{object_name}' with attributes {attributes}", "canonicalId": piclet_data["canonical"]["typeId"], "piclet": None } def create_canonical(object_name: str, piclet_data: str, token_or_username: str) -> dict: """ Create a new canonical Piclet Args: object_name: The normalized object name (e.g., "pillow") piclet_data: JSON string of Piclet instance data token_or_username: Either OAuth token (starts with "hf_") or username for testing Returns: Dict with success status and piclet data """ try: piclet_json = json.loads(piclet_data) if isinstance(piclet_data, str) else piclet_data # Determine if this is a token or username user_info = None if token_or_username and token_or_username.startswith('hf_'): # OAuth token - verify it user_info = verify_hf_token(token_or_username) if not user_info: return { "success": False, "error": "Invalid OAuth token" } else: # Legacy username mode (for testing) user_info = { "sub": f"legacy_{token_or_username}", "preferred_username": token_or_username, "name": token_or_username, "picture": None } # Get or create user profile user_profile = PicletDiscoveryService.get_or_create_user_profile(user_info) # Create canonical entry with full discoverer info canonical_data = { "canonical": { "objectName": object_name, "typeId": f"{PicletDiscoveryService.normalize_object_name(object_name)}_canonical", "discoveredBy": user_info['preferred_username'], "discovererSub": user_info['sub'], "discovererUsername": user_info['preferred_username'], "discovererName": user_info.get('name'), "discovererPicture": user_info.get('picture'), "discoveredAt": datetime.now().isoformat(), "scanCount": 1, "picletData": piclet_json }, "variations": [] } # Save to dataset if PicletDiscoveryService.save_piclet_data(object_name, canonical_data): # Update user profile user_profile["discoveries"].append(canonical_data["canonical"]["typeId"]) user_profile["uniqueFinds"] += 1 user_profile["totalFinds"] += 1 user_profile["rarityScore"] += 100 # Bonus for canonical discovery PicletDiscoveryService.save_user_data(user_info['sub'], user_profile) return { "success": True, "message": f"Created canonical Piclet for '{object_name}'", "piclet": canonical_data["canonical"] } else: return { "success": False, "error": "Failed to save canonical Piclet" } except Exception as e: return { "success": False, "error": str(e) } def create_variation(canonical_id: str, attributes: List[str], piclet_data: str, token_or_username: str, object_name: str) -> dict: """ Create a variation of an existing canonical Piclet with OAuth verification Args: canonical_id: ID of the canonical Piclet attributes: List of variation attributes piclet_data: JSON data for the Piclet token_or_username: Either OAuth token (starts with "hf_") or username for testing object_name: Normalized object name Returns: Success/error dict with variation data """ try: piclet_json = json.loads(piclet_data) if isinstance(piclet_data, str) else piclet_data # Verify token or use legacy mode user_info = None if token_or_username and token_or_username.startswith('hf_'): user_info = verify_hf_token(token_or_username) if not user_info: return {"success": False, "error": "Invalid OAuth token"} else: # Legacy mode for testing user_info = { "sub": f"legacy_{token_or_username}", "preferred_username": token_or_username, "name": token_or_username, "picture": None } # Get or create user profile user_profile = PicletDiscoveryService.get_or_create_user_profile(user_info) # Load existing data existing_data = PicletDiscoveryService.load_piclet_data(object_name) if not existing_data: return { "success": False, "error": f"Canonical Piclet not found for '{object_name}'" } # Create variation entry variation_id = f"{PicletDiscoveryService.normalize_object_name(object_name)}_{len(existing_data['variations']) + 1:03d}" variation = { "typeId": variation_id, "attributes": attributes, "discoveredBy": user_info['preferred_username'], "discovererSub": user_info['sub'], "discovererUsername": user_info['preferred_username'], "discovererName": user_info.get('name'), "discovererPicture": user_info.get('picture'), "discoveredAt": datetime.now().isoformat(), "scanCount": 1, "picletData": piclet_json } # Add to variations existing_data["variations"].append(variation) # Save updated data if PicletDiscoveryService.save_piclet_data(object_name, existing_data): # Update user profile user_profile["discoveries"].append(variation_id) user_profile["totalFinds"] += 1 user_profile["rarityScore"] += 50 # Bonus for variation discovery PicletDiscoveryService.save_user_data(user_info['sub'], user_profile) return { "success": True, "message": f"Created variation of '{object_name}'", "piclet": variation } else: return { "success": False, "error": "Failed to save variation" } except Exception as e: return { "success": False, "error": str(e) } def increment_scan_count(piclet_id: str, object_name: str) -> dict: """ Increment the scan count for a Piclet """ try: data = PicletDiscoveryService.load_piclet_data(object_name) if not data: return { "success": False, "error": "Piclet not found" } # Check canonical if data["canonical"]["typeId"] == piclet_id: data["canonical"]["scanCount"] = data["canonical"].get("scanCount", 0) + 1 scan_count = data["canonical"]["scanCount"] else: # Check variations for variation in data["variations"]: if variation["typeId"] == piclet_id: variation["scanCount"] = variation.get("scanCount", 0) + 1 scan_count = variation["scanCount"] break else: return { "success": False, "error": "Piclet ID not found" } # Save updated data if PicletDiscoveryService.save_piclet_data(object_name, data): return { "success": True, "scanCount": scan_count } else: return { "success": False, "error": "Failed to update scan count" } except Exception as e: return { "success": False, "error": str(e) } def generate_piclet(image, hf_token: str) -> dict: """ Complete Piclet generation workflow - single endpoint Takes user's image and hf_token, returns generated Piclet with discovery status Args: image: Uploaded image file (Gradio file input) hf_token: User's HuggingFace OAuth token Returns: { "success": bool, "piclet": {complete piclet data}, "discoveryStatus": "new" | "variation" | "existing", "canonicalId": str (if variation/existing), "message": str } """ try: # Validate token and get user info user_info = verify_hf_token(hf_token) if not user_info: return { "success": False, "error": "Invalid HuggingFace token" } print(f"Generating Piclet for user: {user_info.get('preferred_username', 'unknown')}") # Get user profile (creates if doesn't exist) user_profile = PicletDiscoveryService.get_or_create_user_profile(user_info) # Get image path from Gradio (type="filepath" gives us a string path) image_path = image if isinstance(image, str) else str(image) # Step 1: Generate caption print("Step 1/5: Generating image caption...") caption = PicletGeneratorService.generate_enhanced_caption(image_path, hf_token) # Step 2: Generate concept print("Step 2/5: Generating Piclet concept...") concept_data = PicletGeneratorService.generate_piclet_concept(caption, hf_token) object_name = concept_data['objectName'] attributes = concept_data['attributes'] stats = concept_data['stats'] image_prompt = concept_data['imagePrompt'] concept_text = concept_data['concept'] # Step 3: Generate image print("Step 3/5: Generating Piclet image...") image_result = PicletGeneratorService.generate_piclet_image( image_prompt, stats['tier'], hf_token ) # Step 4: Check for canonical/variation print("Step 4/5: Checking for existing canonical...") existing_data = PicletDiscoveryService.load_piclet_data(object_name) discovery_status = 'new' canonical_id = None scan_count = 1 if existing_data: # Check if this is an exact canonical match (no attributes) if not attributes or len(attributes) == 0: discovery_status = 'existing' canonical_id = existing_data['canonical']['typeId'] # Increment scan count existing_data['canonical']['scanCount'] = existing_data['canonical'].get('scanCount', 0) + 1 scan_count = existing_data['canonical']['scanCount'] PicletDiscoveryService.save_piclet_data(object_name, existing_data) else: # Check for matching variation variations = existing_data.get('variations', []) matched_variation = None for variation in variations: var_attrs = set(variation.get('attributes', [])) search_attrs = set(attributes) overlap = len(var_attrs.intersection(search_attrs)) if overlap >= len(search_attrs) * 0.5: matched_variation = variation discovery_status = 'existing' canonical_id = existing_data['canonical']['typeId'] # Increment variation scan count variation['scanCount'] = variation.get('scanCount', 0) + 1 scan_count = variation['scanCount'] PicletDiscoveryService.save_piclet_data(object_name, existing_data) break if not matched_variation: discovery_status = 'variation' canonical_id = existing_data['canonical']['typeId'] # Step 5: Save new discovery if needed print("Step 5/5: Saving to dataset...") if discovery_status == 'new': # Create new canonical type_id = f"{PicletDiscoveryService.normalize_object_name(object_name)}_canonical" # Upload image to dataset with canonical filename normalized_name = PicletDiscoveryService.normalize_object_name(object_name) image_filename = f"{normalized_name}_canonical.png" dataset_image_url = PicletGeneratorService.upload_image_to_dataset( image_result['imageUrl'], image_filename ) canonical_data = { "canonical": { "objectName": object_name, "typeId": type_id, "discoveredBy": user_info['preferred_username'], "discovererSub": user_info['sub'], "discovererUsername": user_info['preferred_username'], "discovererName": user_info.get('name'), "discovererPicture": user_info.get('picture'), "discoveredAt": datetime.now().isoformat(), "scanCount": scan_count, "picletData": { "typeId": type_id, "nickname": stats['name'], "stats": stats, "imageUrl": dataset_image_url, "imageCaption": caption, "concept": concept_text, "imagePrompt": image_prompt, "createdAt": datetime.now().isoformat() } }, "variations": [] } canonical_id = type_id PicletDiscoveryService.save_piclet_data(object_name, canonical_data) # Update user profile user_profile["discoveries"].append(type_id) user_profile["uniqueFinds"] = user_profile.get("uniqueFinds", 0) + 1 user_profile["totalFinds"] = user_profile.get("totalFinds", 0) + 1 user_profile["rarityScore"] = user_profile.get("rarityScore", 0) + 100 PicletDiscoveryService.save_user_data(user_info['sub'], user_profile) elif discovery_status == 'variation': # Create new variation existing_data = PicletDiscoveryService.load_piclet_data(object_name) variation_num = len(existing_data['variations']) + 1 normalized_name = PicletDiscoveryService.normalize_object_name(object_name) variation_id = f"{normalized_name}_{variation_num:03d}" # Upload image to dataset with variation filename image_filename = f"{normalized_name}_{variation_num:03d}.png" dataset_image_url = PicletGeneratorService.upload_image_to_dataset( image_result['imageUrl'], image_filename ) variation_data = { "typeId": variation_id, "attributes": attributes, "discoveredBy": user_info['preferred_username'], "discovererSub": user_info['sub'], "discovererUsername": user_info['preferred_username'], "discovererName": user_info.get('name'), "discovererPicture": user_info.get('picture'), "discoveredAt": datetime.now().isoformat(), "scanCount": scan_count, "picletData": { "typeId": variation_id, "nickname": stats['name'], "stats": stats, "imageUrl": dataset_image_url, "imageCaption": caption, "concept": concept_text, "imagePrompt": image_prompt, "createdAt": datetime.now().isoformat() } } existing_data['variations'].append(variation_data) PicletDiscoveryService.save_piclet_data(object_name, existing_data) # Update user profile user_profile["discoveries"].append(variation_id) user_profile["totalFinds"] = user_profile.get("totalFinds", 0) + 1 user_profile["rarityScore"] = user_profile.get("rarityScore", 0) + 50 PicletDiscoveryService.save_user_data(user_info['sub'], user_profile) # Build complete response # For existing piclets, get the stored data; for new/variation, use generated data if discovery_status == 'existing': # Load the existing piclet data to return existing_piclet_data = PicletDiscoveryService.load_piclet_data(object_name) if existing_piclet_data and existing_piclet_data.get('canonical'): existing_canonical = existing_piclet_data['canonical'] piclet_data = existing_canonical.get('picletData', {}) piclet_data['discoveryStatus'] = discovery_status piclet_data['scanCount'] = existing_canonical.get('scanCount', 1) else: # Fallback if data not found piclet_data = { "typeId": canonical_id, "nickname": stats['name'], "stats": stats, "imageUrl": image_result.get('imageUrl', ''), "imageCaption": caption, "concept": concept_text, "imagePrompt": image_prompt, "objectName": object_name, "attributes": attributes, "discoveryStatus": discovery_status, "scanCount": scan_count, "createdAt": datetime.now().isoformat() } else: # For new and variation, determine the correct dataset URL if discovery_status == 'new': normalized_name = PicletDiscoveryService.normalize_object_name(object_name) image_filename = f"{normalized_name}_canonical.png" else: # variation normalized_name = PicletDiscoveryService.normalize_object_name(object_name) existing_data = PicletDiscoveryService.load_piclet_data(object_name) variation_num = len(existing_data.get('variations', [])) image_filename = f"{normalized_name}_{variation_num:03d}.png" dataset_image_url = f"https://huggingface.co/datasets/{DATASET_REPO}/resolve/main/images/{image_filename}" piclet_data = { "typeId": canonical_id, "nickname": stats['name'], "stats": stats, "imageUrl": dataset_image_url, "imageCaption": caption, "concept": concept_text, "imagePrompt": image_prompt, "objectName": object_name, "attributes": attributes, "discoveryStatus": discovery_status, "scanCount": scan_count, "createdAt": datetime.now().isoformat() } messages = { 'new': f"Congratulations! You discovered the first {object_name} Piclet!", 'variation': f"You found a new variation of {object_name}!", 'existing': f"You encountered a known {object_name} Piclet." } return { "success": True, "piclet": piclet_data, "discoveryStatus": discovery_status, "canonicalId": canonical_id, "message": messages.get(discovery_status, "Piclet generated!") } except Exception as e: print(f"Failed to generate Piclet: {e}") import traceback traceback.print_exc() return { "success": False, "error": str(e) } def get_object_details(object_name: str) -> dict: """ Get complete details for an object (canonical + all variations) Args: object_name: The object name (e.g., "pillow", "macbook") Returns: { "success": bool, "objectName": str, "canonical": {canonical data}, "variations": [list of variations], "totalScans": int } """ try: # Load the object data piclet_data = PicletDiscoveryService.load_piclet_data(object_name) if not piclet_data: return { "success": False, "error": f"No piclet found for object '{object_name}'", "objectName": object_name } # Calculate total scans across canonical and variations total_scans = piclet_data['canonical'].get('scanCount', 0) for variation in piclet_data.get('variations', []): total_scans += variation.get('scanCount', 0) return { "success": True, "objectName": object_name, "canonical": piclet_data['canonical'], "variations": piclet_data.get('variations', []), "totalScans": total_scans, "variationCount": len(piclet_data.get('variations', [])) } except Exception as e: print(f"Failed to get object details: {e}") return { "success": False, "error": str(e), "objectName": object_name } def get_user_piclets(hf_token: str) -> dict: """ Get all Piclets discovered by a specific user Args: hf_token: User's HuggingFace OAuth token Returns: { "success": bool, "piclets": [list of piclet discoveries], "stats": {user stats} } """ try: # Verify token and get user info user_info = verify_hf_token(hf_token) if not user_info: return { "success": False, "error": "Invalid HuggingFace token", "piclets": [] } # Load user profile user_profile = PicletDiscoveryService.load_user_data(user_info['sub']) # Get list of discoveries discoveries = user_profile.get('discoveries', []) piclets = [] # Load each discovered piclet for type_id in discoveries: # Extract object name from type_id (e.g., "pillow_canonical" -> "pillow") object_name = type_id.rsplit('_', 1)[0] # Load the piclet data piclet_data = PicletDiscoveryService.load_piclet_data(object_name) if piclet_data: # Check if it's canonical or variation if piclet_data['canonical']['typeId'] == type_id: piclets.append({ 'type': 'canonical', 'typeId': type_id, 'objectName': object_name, 'discoveredAt': piclet_data['canonical']['discoveredAt'], 'scanCount': piclet_data['canonical'].get('scanCount', 1), 'picletData': piclet_data['canonical'].get('picletData', {}) }) else: # Find matching variation for variation in piclet_data.get('variations', []): if variation['typeId'] == type_id: piclets.append({ 'type': 'variation', 'typeId': type_id, 'objectName': object_name, 'attributes': variation.get('attributes', []), 'discoveredAt': variation['discoveredAt'], 'scanCount': variation.get('scanCount', 1), 'picletData': variation.get('picletData', {}) }) break # Sort by discovery date (most recent first) piclets.sort(key=lambda x: x.get('discoveredAt', ''), reverse=True) return { "success": True, "piclets": piclets, "stats": { "username": user_info.get('preferred_username'), "name": user_info.get('name'), "picture": user_info.get('picture'), "totalFinds": user_profile.get('totalFinds', 0), "uniqueFinds": user_profile.get('uniqueFinds', 0), "rarityScore": user_profile.get('rarityScore', 0), "joinedAt": user_profile.get('joinedAt') } } except Exception as e: print(f"Failed to get user piclets: {e}") return { "success": False, "error": str(e), "piclets": [] } def get_recent_activity(limit: int = 20) -> dict: """ Get recent discoveries across all users """ try: activities = [] # List all piclet files try: files = list_repo_files( repo_id=DATASET_REPO, repo_type=DATASET_TYPE, token=HF_TOKEN ) piclet_files = [f for f in files if f.startswith("piclets/") and f.endswith(".json")] except: piclet_files = [] # Load recent piclets (simplified - in production, maintain a separate activity log) for file_path in piclet_files[-limit:]: try: object_name = file_path.replace("piclets/", "").replace(".json", "") data = PicletDiscoveryService.load_piclet_data(object_name) if data: # Add canonical discovery canonical = data["canonical"] activities.append({ "type": "discovery", "objectName": object_name, "typeId": canonical["typeId"], "discoveredBy": canonical["discoveredBy"], "discoveredAt": canonical["discoveredAt"], "scanCount": canonical.get("scanCount", 1) }) # Add recent variations for variation in data.get("variations", [])[-5:]: activities.append({ "type": "variation", "objectName": object_name, "typeId": variation["typeId"], "attributes": variation["attributes"], "discoveredBy": variation["discoveredBy"], "discoveredAt": variation["discoveredAt"], "scanCount": variation.get("scanCount", 1) }) except: continue # Sort by discovery date activities.sort(key=lambda x: x.get("discoveredAt", ""), reverse=True) return { "success": True, "activities": activities[:limit] } except Exception as e: return { "success": False, "error": str(e), "activities": [] } def get_leaderboard(limit: int = 10) -> dict: """ Get top discoverers """ try: leaderboard = [] # List all user files try: files = list_repo_files( repo_id=DATASET_REPO, repo_type=DATASET_TYPE, token=HF_TOKEN ) user_files = [f for f in files if f.startswith("users/") and f.endswith(".json")] except: user_files = [] # Load user data for file_path in user_files: try: username = file_path.replace("users/", "").replace(".json", "") user_data = PicletDiscoveryService.load_user_data(username) leaderboard.append({ "username": username, "totalFinds": user_data.get("totalFinds", 0), "uniqueFinds": user_data.get("uniqueFinds", 0), "rarityScore": user_data.get("rarityScore", 0) }) except: continue # Sort by rarity score leaderboard.sort(key=lambda x: x["rarityScore"], reverse=True) # Add ranks for i, entry in enumerate(leaderboard[:limit]): entry["rank"] = i + 1 return { "success": True, "leaderboard": leaderboard[:limit] } except Exception as e: return { "success": False, "error": str(e), "leaderboard": [] } # Create Gradio interface with gr.Blocks(title="Piclets Discovery Server") as app: gr.Markdown(""" # ๐Ÿ” Piclets Discovery Server Backend service for the Piclets discovery game. Each real-world object has ONE canonical Piclet! """) with gr.Tab("Generate Piclet"): gr.Markdown(""" ## ๐ŸŽฎ Complete Piclet Generator Upload an image and provide your HuggingFace token to generate a complete Piclet. This endpoint handles the entire workflow: captioning, concept generation, image creation, and dataset storage. """) with gr.Row(): with gr.Column(): gen_image = gr.Image(label="Upload Image", type="filepath") gen_token = gr.Textbox(label="HuggingFace Token", placeholder="hf_...", type="password") gen_btn = gr.Button("Generate Piclet", variant="primary") with gr.Column(): gen_result = gr.JSON(label="Generated Piclet Result") gen_btn.click( fn=generate_piclet, inputs=[gen_image, gen_token], outputs=gen_result ) with gr.Tab("My Piclets"): gr.Markdown(""" ## ๐Ÿ“š Your Discovery Collection View all Piclets you've discovered (includes your stats). """) with gr.Row(): with gr.Column(): my_token = gr.Textbox(label="HuggingFace Token", placeholder="hf_...", type="password") my_btn = gr.Button("Get My Piclets", variant="primary") with gr.Column(): my_result = gr.JSON(label="My Piclets") my_btn.click( fn=get_user_piclets, inputs=my_token, outputs=my_result ) with gr.Tab("Object Details"): gr.Markdown(""" ## ๐Ÿ” View Object Details Get complete information about an object (canonical + all variations). """) with gr.Row(): with gr.Column(): obj_name = gr.Textbox(label="Object Name", placeholder="e.g., pillow, macbook") obj_btn = gr.Button("Get Details", variant="primary") with gr.Column(): obj_result = gr.JSON(label="Object Details") obj_btn.click( fn=get_object_details, inputs=obj_name, outputs=obj_result ) with gr.Tab("Recent Activity"): activity_limit = gr.Slider(5, 50, value=20, label="Number of Activities") activity_btn = gr.Button("Get Recent Activity") activity_result = gr.JSON(label="Recent Discoveries") activity_btn.click( fn=get_recent_activity, inputs=activity_limit, outputs=activity_result ) with gr.Tab("Leaderboard"): leader_limit = gr.Slider(5, 20, value=10, label="Top N Discoverers") leader_btn = gr.Button("Get Leaderboard") leader_result = gr.JSON(label="Top Discoverers") leader_btn.click( fn=get_leaderboard, inputs=leader_limit, outputs=leader_result ) # API Documentation gr.Markdown(""" ## ๐Ÿ”Œ Public API Endpoints All endpoints return JSON responses. The frontend only needs these 5 endpoints: ### 1. **generate_piclet** (Scanner) Complete Piclet generation workflow. - Input: `image` (File), `hf_token` (string) - Output: Generated Piclet with discovery status ### 2. **get_user_piclets** (User Collection) Get user's discovered Piclets and stats. - Input: `hf_token` (string) - Output: List of Piclets + user stats (total/unique finds, rarity score) ### 3. **get_object_details** (Object Data) Get complete object info (canonical + all variations). - Input: `object_name` (string) - Output: Canonical + variations + total scans ### 4. **get_recent_activity** (Activity Feed) Recent discoveries across all users. - Input: `limit` (int, default 20) - Output: Recent discoveries with timestamps ### 5. **get_leaderboard** (Top Users) Top discoverers by rarity score. - Input: `limit` (int, default 10) - Output: Ranked users with stats --- *Note: Internal helper functions (search_piclet, create_canonical, etc.) are used by generate_piclet but not exposed to frontend.* """) if __name__ == "__main__": # Protect web UI with authentication while allowing API access admin_password = os.getenv("ADMIN_PASSWORD", "changeme") # Configure for HuggingFace Space environment app.launch()