piclets-server / app.py
Fraser's picture
drop old imports
2ac757d
raw
history blame
59 kB
import gradio as gr
from gradio_client import Client, handle_file
import json
import os
import re
from datetime import datetime
from typing import List, Optional
from huggingface_hub import HfApi, hf_hub_download, list_repo_files
from pathlib import Path
import tempfile
import base64
from rembg import remove
from auth import verify_hf_token
# HuggingFace configuration
HF_TOKEN = os.getenv("HF_TOKEN") # Required for writing to dataset
DATASET_REPO = "Fraser/piclets" # Public dataset repository
DATASET_TYPE = "dataset"
# Initialize HuggingFace API with token if available
api = HfApi(token=HF_TOKEN) if HF_TOKEN else HfApi()
# Cache directory for local operations
CACHE_DIR = Path("cache")
CACHE_DIR.mkdir(exist_ok=True)
class PicletDiscoveryService:
"""Manages Piclet discovery using HuggingFace datasets"""
@staticmethod
def normalize_object_name(name: str) -> str:
"""
Normalize object names for consistent storage and lookup
Examples: "The Blue Pillow" -> "pillow", "wooden chairs" -> "wooden_chair"
"""
if not name:
return "unknown"
# Convert to lowercase and strip
name = name.lower().strip()
# Remove articles (the, a, an)
name = re.sub(r'^(the|a|an)\s+', '', name)
# Remove special characters except spaces
name = re.sub(r'[^a-z0-9\s]', '', name)
# Handle common plurals (basic pluralization rules)
if name.endswith('ies') and len(name) > 4:
name = name[:-3] + 'y' # berries -> berry
elif name.endswith('ves') and len(name) > 4:
name = name[:-3] + 'f' # leaves -> leaf
elif name.endswith('es') and len(name) > 3:
# Check if it's a special case like "glasses"
if not name.endswith(('ses', 'xes', 'zes', 'ches', 'shes')):
name = name[:-2] # boxes -> box (but keep glasses)
elif name.endswith('s') and len(name) > 2 and not name.endswith('ss'):
name = name[:-1] # chairs -> chair (but keep glass)
# Replace spaces with underscores
name = re.sub(r'\s+', '_', name.strip())
return name
@staticmethod
def load_piclet_data(object_name: str) -> Optional[dict]:
"""Load Piclet data from HuggingFace dataset"""
try:
normalized_name = PicletDiscoveryService.normalize_object_name(object_name)
file_path = f"piclets/{normalized_name}.json"
# Download the file from HuggingFace
local_path = hf_hub_download(
repo_id=DATASET_REPO,
filename=file_path,
repo_type=DATASET_TYPE,
token=HF_TOKEN,
cache_dir=str(CACHE_DIR)
)
with open(local_path, 'r') as f:
return json.load(f)
except Exception as e:
print(f"Could not load piclet data for {object_name}: {e}")
return None
@staticmethod
def save_piclet_data(object_name: str, data: dict) -> bool:
"""Save Piclet data to HuggingFace dataset"""
try:
normalized_name = PicletDiscoveryService.normalize_object_name(object_name)
file_path = f"piclets/{normalized_name}.json"
# Create a temporary file
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
json.dump(data, f, indent=2)
temp_path = f.name
# Upload to HuggingFace
api.upload_file(
path_or_fileobj=temp_path,
path_in_repo=file_path,
repo_id=DATASET_REPO,
repo_type=DATASET_TYPE,
commit_message=f"Update piclet: {normalized_name}"
)
# Clean up
os.unlink(temp_path)
return True
except Exception as e:
print(f"Failed to save piclet data: {e}")
return False
@staticmethod
def load_user_data(sub: str) -> dict:
"""
Load user profile from dataset by HF user ID (sub)
Args:
sub: HuggingFace user ID (stable identifier)
Returns:
User profile dict or default profile if not found
"""
try:
file_path = f"users/{sub}.json"
local_path = hf_hub_download(
repo_id=DATASET_REPO,
filename=file_path,
repo_type=DATASET_TYPE,
token=HF_TOKEN,
cache_dir=str(CACHE_DIR)
)
with open(local_path, 'r') as f:
return json.load(f)
except:
# Return default user profile if not found
# Will be populated with actual data on first save
return {
"sub": sub,
"preferred_username": None,
"name": None,
"picture": None,
"joinedAt": datetime.now().isoformat(),
"lastSeen": datetime.now().isoformat(),
"discoveries": [],
"uniqueFinds": 0,
"totalFinds": 0,
"rarityScore": 0,
"visibility": "public"
}
@staticmethod
def save_user_data(sub: str, data: dict) -> bool:
"""
Save user profile to dataset by HF user ID (sub)
Args:
sub: HuggingFace user ID (stable identifier)
data: User profile dict
Returns:
True if successful, False otherwise
"""
try:
file_path = f"users/{sub}.json"
# Update lastSeen timestamp
data["lastSeen"] = datetime.now().isoformat()
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
json.dump(data, f, indent=2)
temp_path = f.name
api.upload_file(
path_or_fileobj=temp_path,
path_in_repo=file_path,
repo_id=DATASET_REPO,
repo_type=DATASET_TYPE,
commit_message=f"Update user profile: {data.get('preferred_username', sub)}"
)
os.unlink(temp_path)
return True
except Exception as e:
print(f"Failed to save user data: {e}")
return False
@staticmethod
def get_or_create_user_profile(user_info: dict) -> dict:
"""
Get existing user profile or create new one from OAuth user_info
Refreshes cached profile data on each call
Args:
user_info: OAuth user info from HF (sub, preferred_username, name, picture)
Returns:
User profile dict
"""
sub = user_info['sub']
# Load existing profile
profile = PicletDiscoveryService.load_user_data(sub)
# Update cached profile fields from OAuth
profile['sub'] = sub
profile['preferred_username'] = user_info.get('preferred_username')
profile['name'] = user_info.get('name')
profile['picture'] = user_info.get('picture')
profile['email'] = user_info.get('email')
# Set joinedAt only if this is a new profile
if 'joinedAt' not in profile or not profile['joinedAt']:
profile['joinedAt'] = datetime.now().isoformat()
return profile
@staticmethod
def update_global_stats() -> dict:
"""Update and return global statistics"""
try:
# Try to load existing stats
try:
local_path = hf_hub_download(
repo_id=DATASET_REPO,
filename="metadata/stats.json",
repo_type=DATASET_TYPE,
token=HF_TOKEN,
cache_dir=str(CACHE_DIR)
)
with open(local_path, 'r') as f:
stats = json.load(f)
except:
stats = {
"totalDiscoveries": 0,
"uniqueObjects": 0,
"totalVariations": 0,
"lastUpdated": datetime.now().isoformat()
}
return stats
except Exception as e:
print(f"Failed to update global stats: {e}")
return {}
class PicletGeneratorService:
"""
Orchestrates Piclet generation by calling external AI services
Uses user's hf_token to consume their GPU quota
"""
# Space endpoints
JOY_CAPTION_SPACE = "fancyfeast/joy-caption-alpha-two"
GPT_OSS_SPACE = "amd/gpt-oss-120b-chatbot"
FLUX_SPACE = "black-forest-labs/FLUX.1-schnell"
@staticmethod
def generate_enhanced_caption(image_path: str, hf_token: str) -> str:
"""Generate detailed image description using JoyCaption
Args:
image_path: Path to image file
hf_token: User's HuggingFace token
"""
try:
print(f"Connecting to JoyCaption space with user token...")
client = Client(
PicletGeneratorService.JOY_CAPTION_SPACE,
hf_token=hf_token
)
print(f"Generating caption for image...")
result = client.predict(
handle_file(image_path), # Wrap path so client uploads file
"Descriptive", # caption_type
"medium-length", # caption_length
[], # extra_options
"", # name_input
"Describe this image in detail, identifying any recognizable objects, brands, logos, or specific models. Be specific about product names and types.", # custom_prompt
api_name="/stream_chat"
)
# JoyCaption returns tuple: (prompt_used, caption_text) in .data
result_data = result.data if hasattr(result, 'data') else result
caption = result_data[1] if isinstance(result_data, (list, tuple)) and len(result_data) > 1 else str(result_data)
print(f"Caption generated: {caption[:100]}...")
return caption
except Exception as e:
print(f"Failed to generate caption: {e}")
raise Exception(f"Caption generation failed: {str(e)}")
@staticmethod
def generate_text_with_gpt(prompt: str, hf_token: str) -> str:
"""Generate text using GPT-OSS-120B"""
try:
print(f"Connecting to GPT-OSS space...")
client = Client(
PicletGeneratorService.GPT_OSS_SPACE,
hf_token=hf_token
)
print(f"Generating text...")
result = client.predict(
api_name="/chat",
message=prompt,
history=[],
system_prompt="You are a helpful assistant that creates Pokemon-style monster concepts based on real-world objects.",
temperature=0.7
)
# Extract response text (GPT-OSS formats with Analysis and Response)
result_data = result.data if hasattr(result, 'data') else result
response_text = result_data[0] if isinstance(result_data, (list, tuple)) else str(result_data)
# Try to extract Response section
response_match = re.search(r'\*\*๐Ÿ’ฌ Response:\*\*\s*\n\n([\s\S]*)', response_text)
if response_match:
return response_match.group(1).strip()
# Fallback: extract after "assistantfinal"
final_match = re.search(r'assistantfinal\s*([\s\S]*)', response_text)
if final_match:
return final_match.group(1).strip()
return response_text
except Exception as e:
print(f"Failed to generate text: {e}")
raise Exception(f"Text generation failed: {str(e)}")
@staticmethod
def generate_piclet_concept(caption: str, hf_token: str) -> dict:
"""
Generate complete Piclet concept from image caption
Returns parsed concept with object name, variation, stats, etc.
"""
concept_prompt = f"""You are analyzing an image to create a Pokemon-style creature. Here's the image description:
"{caption}"
Your task:
1. Identify the PRIMARY PHYSICAL OBJECT with SPECIFICITY (e.g., "macbook" not "laptop", "eiffel tower" not "tower", "iphone" not "phone", "starbucks mug" not "mug")
2. Determine if there's a meaningful VARIATION (e.g., "silver", "pro", "night", "gaming", "vintage")
3. Assess rarity based on uniqueness
4. Create a complete Pokemon-style monster concept
Format your response EXACTLY as follows:
```md
# Canonical Object
{{Specific object name: "macbook", "eiffel tower", "iphone", "tesla", "le creuset mug", "nintendo switch"}}
{{NOT generic terms like: "laptop", "tower", "phone", "car", "mug", "console"}}
{{Include brand/model/landmark name when identifiable}}
# Variation
{{OPTIONAL: one distinctive attribute like "silver", "pro", "night", "gaming", OR use "canonical" if this is the standard/default version with no special variation}}
# Object Rarity
{{common, uncommon, rare, epic, or legendary based on object uniqueness}}
# Monster Name
{{Creative 8-11 letter name based on the SPECIFIC object, e.g., "Macbyte" for MacBook, "Towerfell" for Eiffel Tower}}
# Primary Type
{{beast, bug, aquatic, flora, mineral, space, machina, structure, culture, or cuisine}}
# Physical Stats
Height: {{e.g., "1.2m" or "3'5\\""}}
Weight: {{e.g., "15kg" or "33 lbs"}}
# Personality
{{1-2 sentences describing personality traits}}
# Monster Description
{{2-3 paragraphs describing how the SPECIFIC object's features translate into monster features. Reference the actual object by name. This is the creature's bio.}}
# Monster Image Prompt
{{Concise visual description for anime-style image generation focusing on colors, shapes, and key features inspired by the specific object}}
```
CRITICAL RULES:
- Canonical Object MUST be SPECIFIC: "macbook" not "laptop", "big ben" not "clock tower", "coca cola" not "soda"
- If you can identify a brand, model, or proper name from the description, USE IT
- Variation should be meaningful and distinctive (material, style, color, context, or model variant)
- Monster Description must describe the CREATURE with references to the specific object's features
- Primary Type must match the object category (machina for electronics, structure for buildings, etc.)"""
response_text = PicletGeneratorService.generate_text_with_gpt(concept_prompt, hf_token)
# Parse the concept
return PicletGeneratorService.parse_concept(response_text)
@staticmethod
def parse_concept(concept_text: str) -> dict:
"""Parse structured concept text into dict"""
# Remove code block markers if present
if '```' in concept_text:
code_block_match = re.search(r'```(?:md|markdown)?\s*\n([\s\S]*?)```', concept_text)
if code_block_match:
concept_text = code_block_match.group(1).strip()
def extract_section(text: str, section: str) -> str:
"""Extract content of a markdown section"""
pattern = rf'\*{{0,2}}#\s*{re.escape(section)}\s*\*{{0,2}}\s*\n([\s\S]*?)(?=^\*{{0,2}}#|$)'
match = re.search(pattern, text, re.MULTILINE)
if match:
content = match.group(1).strip()
# Remove curly braces and quotes that GPT sometimes adds
content = re.sub(r'^[{"]|["}]$', '', content)
content = re.sub(r'^.*:\s*["\']|["\']$', '', content)
return content.strip()
return ''
# Extract all sections
object_name = extract_section(concept_text, 'Canonical Object').lower()
variation_text = extract_section(concept_text, 'Variation')
rarity_text = extract_section(concept_text, 'Object Rarity').lower()
monster_name = extract_section(concept_text, 'Monster Name')
primary_type = extract_section(concept_text, 'Primary Type').lower()
description = extract_section(concept_text, 'Monster Description')
image_prompt = extract_section(concept_text, 'Monster Image Prompt')
# Parse physical stats
physical_stats_text = extract_section(concept_text, 'Physical Stats')
height_match = re.search(r'Height:\s*(.+)', physical_stats_text, re.IGNORECASE)
weight_match = re.search(r'Weight:\s*(.+)', physical_stats_text, re.IGNORECASE)
height = height_match.group(1).strip() if height_match else None
weight = weight_match.group(1).strip() if weight_match else None
personality = extract_section(concept_text, 'Personality')
# Clean monster name
if monster_name:
monster_name = re.sub(r'\*+', '', monster_name) # Remove asterisks
if ',' in monster_name:
monster_name = monster_name.split(',')[0]
if len(monster_name) > 12:
monster_name = monster_name[:12]
# Parse variation
attributes = []
if variation_text and variation_text.lower() not in ['none', 'canonical', '']:
attributes = [variation_text.lower()]
# Map rarity to tier
tier = 'medium'
if 'common' in rarity_text:
tier = 'low'
elif 'uncommon' in rarity_text:
tier = 'medium'
elif 'rare' in rarity_text and 'epic' not in rarity_text:
tier = 'high'
elif 'legendary' in rarity_text or 'epic' in rarity_text or 'mythical' in rarity_text:
tier = 'legendary'
return {
'objectName': object_name,
'attributes': attributes,
'concept': concept_text,
'stats': {
'name': monster_name or 'Unknown',
'description': description,
'tier': tier,
'primaryType': primary_type or 'beast',
'height': height,
'weight': weight,
'personality': personality
},
'imagePrompt': image_prompt
}
@staticmethod
def generate_piclet_image(image_prompt: str, tier: str, hf_token: str) -> dict:
"""Generate Piclet image using Flux"""
try:
print(f"Connecting to Flux space...")
client = Client(
PicletGeneratorService.FLUX_SPACE,
hf_token=hf_token
)
tier_descriptions = {
'low': 'simple and iconic design',
'medium': 'detailed and well-crafted design',
'high': 'highly detailed and impressive design with special effects',
'legendary': 'highly detailed and majestic design with dramatic lighting and aura effects'
}
full_prompt = f"{image_prompt}\nNow generate an Pokรฉmon Anime image of the monster in an idle pose with a plain dark-grey background. This is a {tier} tier monster with a {tier_descriptions.get(tier, tier_descriptions['medium'])}. The monster should not be attacking or in motion. The full monster must be visible within the frame."
print(f"Generating image with prompt: {full_prompt[:100]}...")
result = client.predict(
full_prompt, # prompt
0, # seed
True, # randomize_seed
1024, # width
1024, # height
4, # num_inference_steps
api_name="/infer"
)
# Extract image URL and seed
result_data = result.data if hasattr(result, 'data') else result
image_data = result_data[0] if isinstance(result_data, (list, tuple)) else result_data
seed = result_data[1] if isinstance(result_data, (list, tuple)) and len(result_data) > 1 else 0
# Handle different return formats
image_url = None
if isinstance(image_data, str):
image_url = image_data
elif isinstance(image_data, dict):
image_url = image_data.get('url') or image_data.get('path')
if not image_url:
raise Exception("Failed to extract image URL from Flux response")
return {
'imageUrl': image_url,
'seed': seed,
'prompt': image_prompt
}
except Exception as e:
print(f"Failed to generate image: {e}")
raise Exception(f"Image generation failed: {str(e)}")
@staticmethod
def remove_background(image_path: str) -> str:
"""
Remove background from image using rembg
Returns base64 encoded PNG with transparency
Args:
image_path: Path to the input image file
Returns:
Base64 encoded string of the image with transparent background
"""
try:
print(f"Removing background from image...")
# Open the image
with open(image_path, 'rb') as f:
input_image = f.read()
# Remove background using rembg
output_image = remove(input_image)
# Convert to base64
base64_image = base64.b64encode(output_image).decode('utf-8')
print(f"Background removal completed")
return f"data:image/png;base64,{base64_image}"
except Exception as e:
print(f"Failed to remove background: {e}")
raise Exception(f"Background removal failed: {str(e)}")
@staticmethod
def upload_image_to_dataset(image_data: str, file_name: str) -> str:
"""
Upload image to HuggingFace dataset
Args:
image_data: Base64 encoded image data (with or without data URI prefix)
file_name: Name for the file (e.g., "pillow_canonical.png")
Returns:
URL to the uploaded image in the dataset
"""
try:
print(f"Uploading image to dataset: {file_name}")
# Remove data URI prefix if present
if image_data.startswith('data:'):
image_data = image_data.split(',', 1)[1]
# Decode base64 to bytes
image_bytes = base64.b64decode(image_data)
# Create temporary file
with tempfile.NamedTemporaryFile(mode='wb', suffix='.png', delete=False) as f:
f.write(image_bytes)
temp_path = f.name
# Upload to HuggingFace dataset
file_path = f"images/{file_name}"
api.upload_file(
path_or_fileobj=temp_path,
path_in_repo=file_path,
repo_id=DATASET_REPO,
repo_type=DATASET_TYPE,
commit_message=f"Add piclet image: {file_name}"
)
# Clean up temp file
os.unlink(temp_path)
# Return the dataset URL
dataset_url = f"https://huggingface.co/datasets/{DATASET_REPO}/resolve/main/{file_path}"
print(f"Image uploaded successfully: {dataset_url}")
return dataset_url
except Exception as e:
print(f"Failed to upload image: {e}")
raise Exception(f"Image upload failed: {str(e)}")
# API Endpoints
def search_piclet(object_name: str, attributes: List[str]) -> dict:
"""
Search for canonical Piclet or variations
Returns matching piclet or None
"""
piclet_data = PicletDiscoveryService.load_piclet_data(object_name)
if not piclet_data:
return {
"status": "new",
"message": f"No Piclet found for '{object_name}'",
"piclet": None
}
# Check if searching for canonical (no attributes)
if not attributes or len(attributes) == 0:
return {
"status": "existing",
"message": f"Found canonical Piclet for '{object_name}'",
"piclet": piclet_data.get("canonical")
}
# Search for matching variation
variations = piclet_data.get("variations", [])
for variation in variations:
var_attrs = set(variation.get("attributes", []))
search_attrs = set(attributes)
# Check for close match (at least 50% overlap)
overlap = len(var_attrs.intersection(search_attrs))
if overlap >= len(search_attrs) * 0.5:
return {
"status": "variation",
"message": f"Found variation of '{object_name}'",
"piclet": variation,
"canonicalId": piclet_data["canonical"]["typeId"]
}
# No variation found, suggest creating one
return {
"status": "new_variation",
"message": f"No variation found for '{object_name}' with attributes {attributes}",
"canonicalId": piclet_data["canonical"]["typeId"],
"piclet": None
}
def create_canonical(object_name: str, piclet_data: str, token_or_username: str) -> dict:
"""
Create a new canonical Piclet
Args:
object_name: The normalized object name (e.g., "pillow")
piclet_data: JSON string of Piclet instance data
token_or_username: Either OAuth token (starts with "hf_") or username for testing
Returns:
Dict with success status and piclet data
"""
try:
piclet_json = json.loads(piclet_data) if isinstance(piclet_data, str) else piclet_data
# Determine if this is a token or username
user_info = None
if token_or_username and token_or_username.startswith('hf_'):
# OAuth token - verify it
user_info = verify_hf_token(token_or_username)
if not user_info:
return {
"success": False,
"error": "Invalid OAuth token"
}
else:
# Legacy username mode (for testing)
user_info = {
"sub": f"legacy_{token_or_username}",
"preferred_username": token_or_username,
"name": token_or_username,
"picture": None
}
# Get or create user profile
user_profile = PicletDiscoveryService.get_or_create_user_profile(user_info)
# Create canonical entry with full discoverer info
canonical_data = {
"canonical": {
"objectName": object_name,
"typeId": f"{PicletDiscoveryService.normalize_object_name(object_name)}_canonical",
"discoveredBy": user_info['preferred_username'],
"discovererSub": user_info['sub'],
"discovererUsername": user_info['preferred_username'],
"discovererName": user_info.get('name'),
"discovererPicture": user_info.get('picture'),
"discoveredAt": datetime.now().isoformat(),
"scanCount": 1,
"picletData": piclet_json
},
"variations": []
}
# Save to dataset
if PicletDiscoveryService.save_piclet_data(object_name, canonical_data):
# Update user profile
user_profile["discoveries"].append(canonical_data["canonical"]["typeId"])
user_profile["uniqueFinds"] += 1
user_profile["totalFinds"] += 1
user_profile["rarityScore"] += 100 # Bonus for canonical discovery
PicletDiscoveryService.save_user_data(user_info['sub'], user_profile)
return {
"success": True,
"message": f"Created canonical Piclet for '{object_name}'",
"piclet": canonical_data["canonical"]
}
else:
return {
"success": False,
"error": "Failed to save canonical Piclet"
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
def create_variation(canonical_id: str, attributes: List[str], piclet_data: str, token_or_username: str, object_name: str) -> dict:
"""
Create a variation of an existing canonical Piclet with OAuth verification
Args:
canonical_id: ID of the canonical Piclet
attributes: List of variation attributes
piclet_data: JSON data for the Piclet
token_or_username: Either OAuth token (starts with "hf_") or username for testing
object_name: Normalized object name
Returns:
Success/error dict with variation data
"""
try:
piclet_json = json.loads(piclet_data) if isinstance(piclet_data, str) else piclet_data
# Verify token or use legacy mode
user_info = None
if token_or_username and token_or_username.startswith('hf_'):
user_info = verify_hf_token(token_or_username)
if not user_info:
return {"success": False, "error": "Invalid OAuth token"}
else:
# Legacy mode for testing
user_info = {
"sub": f"legacy_{token_or_username}",
"preferred_username": token_or_username,
"name": token_or_username,
"picture": None
}
# Get or create user profile
user_profile = PicletDiscoveryService.get_or_create_user_profile(user_info)
# Load existing data
existing_data = PicletDiscoveryService.load_piclet_data(object_name)
if not existing_data:
return {
"success": False,
"error": f"Canonical Piclet not found for '{object_name}'"
}
# Create variation entry
variation_id = f"{PicletDiscoveryService.normalize_object_name(object_name)}_{len(existing_data['variations']) + 1:03d}"
variation = {
"typeId": variation_id,
"attributes": attributes,
"discoveredBy": user_info['preferred_username'],
"discovererSub": user_info['sub'],
"discovererUsername": user_info['preferred_username'],
"discovererName": user_info.get('name'),
"discovererPicture": user_info.get('picture'),
"discoveredAt": datetime.now().isoformat(),
"scanCount": 1,
"picletData": piclet_json
}
# Add to variations
existing_data["variations"].append(variation)
# Save updated data
if PicletDiscoveryService.save_piclet_data(object_name, existing_data):
# Update user profile
user_profile["discoveries"].append(variation_id)
user_profile["totalFinds"] += 1
user_profile["rarityScore"] += 50 # Bonus for variation discovery
PicletDiscoveryService.save_user_data(user_info['sub'], user_profile)
return {
"success": True,
"message": f"Created variation of '{object_name}'",
"piclet": variation
}
else:
return {
"success": False,
"error": "Failed to save variation"
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
def increment_scan_count(piclet_id: str, object_name: str) -> dict:
"""
Increment the scan count for a Piclet
"""
try:
data = PicletDiscoveryService.load_piclet_data(object_name)
if not data:
return {
"success": False,
"error": "Piclet not found"
}
# Check canonical
if data["canonical"]["typeId"] == piclet_id:
data["canonical"]["scanCount"] = data["canonical"].get("scanCount", 0) + 1
scan_count = data["canonical"]["scanCount"]
else:
# Check variations
for variation in data["variations"]:
if variation["typeId"] == piclet_id:
variation["scanCount"] = variation.get("scanCount", 0) + 1
scan_count = variation["scanCount"]
break
else:
return {
"success": False,
"error": "Piclet ID not found"
}
# Save updated data
if PicletDiscoveryService.save_piclet_data(object_name, data):
return {
"success": True,
"scanCount": scan_count
}
else:
return {
"success": False,
"error": "Failed to update scan count"
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
def generate_piclet(image, hf_token: str) -> dict:
"""
Complete Piclet generation workflow - single endpoint
Takes user's image and hf_token, returns generated Piclet with discovery status
Args:
image: Uploaded image file (Gradio file input)
hf_token: User's HuggingFace OAuth token
Returns:
{
"success": bool,
"piclet": {complete piclet data},
"discoveryStatus": "new" | "variation" | "existing",
"canonicalId": str (if variation/existing),
"message": str
}
"""
try:
# Validate token and get user info
user_info = verify_hf_token(hf_token)
if not user_info:
return {
"success": False,
"error": "Invalid HuggingFace token"
}
print(f"Generating Piclet for user: {user_info.get('preferred_username', 'unknown')}")
# Get user profile (creates if doesn't exist)
user_profile = PicletDiscoveryService.get_or_create_user_profile(user_info)
# Get image path from Gradio (type="filepath" gives us a string path)
image_path = image if isinstance(image, str) else str(image)
# Step 1: Generate caption
print("Step 1/5: Generating image caption...")
caption = PicletGeneratorService.generate_enhanced_caption(image_path, hf_token)
# Step 2: Generate concept
print("Step 2/5: Generating Piclet concept...")
concept_data = PicletGeneratorService.generate_piclet_concept(caption, hf_token)
object_name = concept_data['objectName']
attributes = concept_data['attributes']
stats = concept_data['stats']
image_prompt = concept_data['imagePrompt']
concept_text = concept_data['concept']
# Step 3: Generate image
print("Step 3/5: Generating Piclet image...")
image_result = PicletGeneratorService.generate_piclet_image(
image_prompt,
stats['tier'],
hf_token
)
# Step 3.5: Remove background from generated image
print("Step 3.5/5: Processing image (removing background)...")
image_local_path = image_result['imageUrl']
# Handle both local paths and URLs
if image_local_path.startswith('http'):
# If it's a URL, download it first
import requests
response = requests.get(image_local_path)
with tempfile.NamedTemporaryFile(mode='wb', suffix='.png', delete=False) as f:
f.write(response.content)
image_local_path = f.name
# Remove background
transparent_image_base64 = PicletGeneratorService.remove_background(image_local_path)
# Store the base64 data in image_result for later use
image_result['imageData'] = transparent_image_base64
# Step 4: Check for canonical/variation
print("Step 4/5: Checking for existing canonical...")
existing_data = PicletDiscoveryService.load_piclet_data(object_name)
discovery_status = 'new'
canonical_id = None
scan_count = 1
if existing_data:
# Check if this is an exact canonical match (no attributes)
if not attributes or len(attributes) == 0:
discovery_status = 'existing'
canonical_id = existing_data['canonical']['typeId']
# Increment scan count
existing_data['canonical']['scanCount'] = existing_data['canonical'].get('scanCount', 0) + 1
scan_count = existing_data['canonical']['scanCount']
PicletDiscoveryService.save_piclet_data(object_name, existing_data)
else:
# Check for matching variation
variations = existing_data.get('variations', [])
matched_variation = None
for variation in variations:
var_attrs = set(variation.get('attributes', []))
search_attrs = set(attributes)
overlap = len(var_attrs.intersection(search_attrs))
if overlap >= len(search_attrs) * 0.5:
matched_variation = variation
discovery_status = 'existing'
canonical_id = existing_data['canonical']['typeId']
# Increment variation scan count
variation['scanCount'] = variation.get('scanCount', 0) + 1
scan_count = variation['scanCount']
PicletDiscoveryService.save_piclet_data(object_name, existing_data)
break
if not matched_variation:
discovery_status = 'variation'
canonical_id = existing_data['canonical']['typeId']
# Step 5: Save new discovery if needed
print("Step 5/5: Saving to dataset...")
if discovery_status == 'new':
# Create new canonical
type_id = f"{PicletDiscoveryService.normalize_object_name(object_name)}_canonical"
# Upload image to dataset with canonical filename
normalized_name = PicletDiscoveryService.normalize_object_name(object_name)
image_filename = f"{normalized_name}_canonical.png"
dataset_image_url = PicletGeneratorService.upload_image_to_dataset(
image_result['imageData'],
image_filename
)
canonical_data = {
"canonical": {
"objectName": object_name,
"typeId": type_id,
"discoveredBy": user_info['preferred_username'],
"discovererSub": user_info['sub'],
"discovererUsername": user_info['preferred_username'],
"discovererName": user_info.get('name'),
"discovererPicture": user_info.get('picture'),
"discoveredAt": datetime.now().isoformat(),
"scanCount": scan_count,
"picletData": {
"typeId": type_id,
"nickname": stats['name'],
"stats": stats,
"imageUrl": dataset_image_url,
"imageData": image_result['imageData'],
"imageCaption": caption,
"concept": concept_text,
"imagePrompt": image_prompt,
"createdAt": datetime.now().isoformat()
}
},
"variations": []
}
canonical_id = type_id
PicletDiscoveryService.save_piclet_data(object_name, canonical_data)
# Update user profile
user_profile["discoveries"].append(type_id)
user_profile["uniqueFinds"] = user_profile.get("uniqueFinds", 0) + 1
user_profile["totalFinds"] = user_profile.get("totalFinds", 0) + 1
user_profile["rarityScore"] = user_profile.get("rarityScore", 0) + 100
PicletDiscoveryService.save_user_data(user_info['sub'], user_profile)
elif discovery_status == 'variation':
# Create new variation
existing_data = PicletDiscoveryService.load_piclet_data(object_name)
variation_num = len(existing_data['variations']) + 1
normalized_name = PicletDiscoveryService.normalize_object_name(object_name)
variation_id = f"{normalized_name}_{variation_num:03d}"
# Upload image to dataset with variation filename
image_filename = f"{normalized_name}_{variation_num:03d}.png"
dataset_image_url = PicletGeneratorService.upload_image_to_dataset(
image_result['imageData'],
image_filename
)
variation_data = {
"typeId": variation_id,
"attributes": attributes,
"discoveredBy": user_info['preferred_username'],
"discovererSub": user_info['sub'],
"discovererUsername": user_info['preferred_username'],
"discovererName": user_info.get('name'),
"discovererPicture": user_info.get('picture'),
"discoveredAt": datetime.now().isoformat(),
"scanCount": scan_count,
"picletData": {
"typeId": variation_id,
"nickname": stats['name'],
"stats": stats,
"imageUrl": dataset_image_url,
"imageData": image_result['imageData'],
"imageCaption": caption,
"concept": concept_text,
"imagePrompt": image_prompt,
"createdAt": datetime.now().isoformat()
}
}
existing_data['variations'].append(variation_data)
PicletDiscoveryService.save_piclet_data(object_name, existing_data)
# Update user profile
user_profile["discoveries"].append(variation_id)
user_profile["totalFinds"] = user_profile.get("totalFinds", 0) + 1
user_profile["rarityScore"] = user_profile.get("rarityScore", 0) + 50
PicletDiscoveryService.save_user_data(user_info['sub'], user_profile)
# Build complete response
# For existing piclets, get the stored data; for new/variation, use generated data
if discovery_status == 'existing':
# Load the existing piclet data to return
existing_piclet_data = PicletDiscoveryService.load_piclet_data(object_name)
if existing_piclet_data and existing_piclet_data.get('canonical'):
existing_canonical = existing_piclet_data['canonical']
piclet_data = existing_canonical.get('picletData', {})
piclet_data['discoveryStatus'] = discovery_status
piclet_data['scanCount'] = existing_canonical.get('scanCount', 1)
else:
# Fallback if data not found
piclet_data = {
"typeId": canonical_id,
"nickname": stats['name'],
"stats": stats,
"imageUrl": image_result.get('imageUrl', ''),
"imageData": image_result.get('imageData', ''),
"imageCaption": caption,
"concept": concept_text,
"imagePrompt": image_prompt,
"objectName": object_name,
"attributes": attributes,
"discoveryStatus": discovery_status,
"scanCount": scan_count,
"createdAt": datetime.now().isoformat()
}
else:
# For new and variation, determine the correct dataset URL
if discovery_status == 'new':
normalized_name = PicletDiscoveryService.normalize_object_name(object_name)
image_filename = f"{normalized_name}_canonical.png"
else: # variation
normalized_name = PicletDiscoveryService.normalize_object_name(object_name)
existing_data = PicletDiscoveryService.load_piclet_data(object_name)
variation_num = len(existing_data.get('variations', []))
image_filename = f"{normalized_name}_{variation_num:03d}.png"
dataset_image_url = f"https://huggingface.co/datasets/{DATASET_REPO}/resolve/main/images/{image_filename}"
piclet_data = {
"typeId": canonical_id,
"nickname": stats['name'],
"stats": stats,
"imageUrl": dataset_image_url,
"imageData": image_result.get('imageData', ''),
"imageCaption": caption,
"concept": concept_text,
"imagePrompt": image_prompt,
"objectName": object_name,
"attributes": attributes,
"discoveryStatus": discovery_status,
"scanCount": scan_count,
"createdAt": datetime.now().isoformat()
}
messages = {
'new': f"Congratulations! You discovered the first {object_name} Piclet!",
'variation': f"You found a new variation of {object_name}!",
'existing': f"You encountered a known {object_name} Piclet."
}
return {
"success": True,
"piclet": piclet_data,
"discoveryStatus": discovery_status,
"canonicalId": canonical_id,
"message": messages.get(discovery_status, "Piclet generated!")
}
except Exception as e:
print(f"Failed to generate Piclet: {e}")
import traceback
traceback.print_exc()
return {
"success": False,
"error": str(e)
}
def get_object_details(object_name: str) -> dict:
"""
Get complete details for an object (canonical + all variations)
Args:
object_name: The object name (e.g., "pillow", "macbook")
Returns:
{
"success": bool,
"objectName": str,
"canonical": {canonical data},
"variations": [list of variations],
"totalScans": int
}
"""
try:
# Load the object data
piclet_data = PicletDiscoveryService.load_piclet_data(object_name)
if not piclet_data:
return {
"success": False,
"error": f"No piclet found for object '{object_name}'",
"objectName": object_name
}
# Calculate total scans across canonical and variations
total_scans = piclet_data['canonical'].get('scanCount', 0)
for variation in piclet_data.get('variations', []):
total_scans += variation.get('scanCount', 0)
return {
"success": True,
"objectName": object_name,
"canonical": piclet_data['canonical'],
"variations": piclet_data.get('variations', []),
"totalScans": total_scans,
"variationCount": len(piclet_data.get('variations', []))
}
except Exception as e:
print(f"Failed to get object details: {e}")
return {
"success": False,
"error": str(e),
"objectName": object_name
}
def get_user_piclets(hf_token: str) -> dict:
"""
Get all Piclets discovered by a specific user
Args:
hf_token: User's HuggingFace OAuth token
Returns:
{
"success": bool,
"piclets": [list of piclet discoveries],
"stats": {user stats}
}
"""
try:
# Verify token and get user info
user_info = verify_hf_token(hf_token)
if not user_info:
return {
"success": False,
"error": "Invalid HuggingFace token",
"piclets": []
}
# Load user profile
user_profile = PicletDiscoveryService.load_user_data(user_info['sub'])
# Get list of discoveries
discoveries = user_profile.get('discoveries', [])
piclets = []
# Load each discovered piclet
for type_id in discoveries:
# Extract object name from type_id (e.g., "pillow_canonical" -> "pillow")
object_name = type_id.rsplit('_', 1)[0]
# Load the piclet data
piclet_data = PicletDiscoveryService.load_piclet_data(object_name)
if piclet_data:
# Check if it's canonical or variation
if piclet_data['canonical']['typeId'] == type_id:
piclets.append({
'type': 'canonical',
'typeId': type_id,
'objectName': object_name,
'discoveredAt': piclet_data['canonical']['discoveredAt'],
'scanCount': piclet_data['canonical'].get('scanCount', 1),
'picletData': piclet_data['canonical'].get('picletData', {})
})
else:
# Find matching variation
for variation in piclet_data.get('variations', []):
if variation['typeId'] == type_id:
piclets.append({
'type': 'variation',
'typeId': type_id,
'objectName': object_name,
'attributes': variation.get('attributes', []),
'discoveredAt': variation['discoveredAt'],
'scanCount': variation.get('scanCount', 1),
'picletData': variation.get('picletData', {})
})
break
# Sort by discovery date (most recent first)
piclets.sort(key=lambda x: x.get('discoveredAt', ''), reverse=True)
return {
"success": True,
"piclets": piclets,
"stats": {
"username": user_info.get('preferred_username'),
"name": user_info.get('name'),
"picture": user_info.get('picture'),
"totalFinds": user_profile.get('totalFinds', 0),
"uniqueFinds": user_profile.get('uniqueFinds', 0),
"rarityScore": user_profile.get('rarityScore', 0),
"joinedAt": user_profile.get('joinedAt')
}
}
except Exception as e:
print(f"Failed to get user piclets: {e}")
return {
"success": False,
"error": str(e),
"piclets": []
}
def get_recent_activity(limit: int = 20) -> dict:
"""
Get recent discoveries across all users
"""
try:
activities = []
# List all piclet files
try:
files = list_repo_files(
repo_id=DATASET_REPO,
repo_type=DATASET_TYPE,
token=HF_TOKEN
)
piclet_files = [f for f in files if f.startswith("piclets/") and f.endswith(".json")]
except:
piclet_files = []
# Load recent piclets (simplified - in production, maintain a separate activity log)
for file_path in piclet_files[-limit:]:
try:
object_name = file_path.replace("piclets/", "").replace(".json", "")
data = PicletDiscoveryService.load_piclet_data(object_name)
if data:
# Add canonical discovery
canonical = data["canonical"]
activities.append({
"type": "discovery",
"objectName": object_name,
"typeId": canonical["typeId"],
"discoveredBy": canonical["discoveredBy"],
"discoveredAt": canonical["discoveredAt"],
"scanCount": canonical.get("scanCount", 1)
})
# Add recent variations
for variation in data.get("variations", [])[-5:]:
activities.append({
"type": "variation",
"objectName": object_name,
"typeId": variation["typeId"],
"attributes": variation["attributes"],
"discoveredBy": variation["discoveredBy"],
"discoveredAt": variation["discoveredAt"],
"scanCount": variation.get("scanCount", 1)
})
except:
continue
# Sort by discovery date
activities.sort(key=lambda x: x.get("discoveredAt", ""), reverse=True)
return {
"success": True,
"activities": activities[:limit]
}
except Exception as e:
return {
"success": False,
"error": str(e),
"activities": []
}
def get_leaderboard(limit: int = 10) -> dict:
"""
Get top discoverers
"""
try:
leaderboard = []
# List all user files
try:
files = list_repo_files(
repo_id=DATASET_REPO,
repo_type=DATASET_TYPE,
token=HF_TOKEN
)
user_files = [f for f in files if f.startswith("users/") and f.endswith(".json")]
except:
user_files = []
# Load user data
for file_path in user_files:
try:
username = file_path.replace("users/", "").replace(".json", "")
user_data = PicletDiscoveryService.load_user_data(username)
leaderboard.append({
"username": username,
"totalFinds": user_data.get("totalFinds", 0),
"uniqueFinds": user_data.get("uniqueFinds", 0),
"rarityScore": user_data.get("rarityScore", 0)
})
except:
continue
# Sort by rarity score
leaderboard.sort(key=lambda x: x["rarityScore"], reverse=True)
# Add ranks
for i, entry in enumerate(leaderboard[:limit]):
entry["rank"] = i + 1
return {
"success": True,
"leaderboard": leaderboard[:limit]
}
except Exception as e:
return {
"success": False,
"error": str(e),
"leaderboard": []
}
# Create Gradio interface
with gr.Blocks(title="Piclets Discovery Server") as app:
gr.Markdown("""
# ๐Ÿ” Piclets Discovery Server
Backend service for the Piclets discovery game. Each real-world object has ONE canonical Piclet!
""")
with gr.Tab("Generate Piclet"):
gr.Markdown("""
## ๐ŸŽฎ Complete Piclet Generator
Upload an image and provide your HuggingFace token to generate a complete Piclet.
This endpoint handles the entire workflow: captioning, concept generation, image creation, and dataset storage.
""")
with gr.Row():
with gr.Column():
gen_image = gr.Image(label="Upload Image", type="filepath")
gen_token = gr.Textbox(label="HuggingFace Token", placeholder="hf_...", type="password")
gen_btn = gr.Button("Generate Piclet", variant="primary")
with gr.Column():
gen_result = gr.JSON(label="Generated Piclet Result")
gen_btn.click(
fn=generate_piclet,
inputs=[gen_image, gen_token],
outputs=gen_result
)
with gr.Tab("My Piclets"):
gr.Markdown("""
## ๐Ÿ“š Your Discovery Collection
View all Piclets you've discovered (includes your stats).
""")
with gr.Row():
with gr.Column():
my_token = gr.Textbox(label="HuggingFace Token", placeholder="hf_...", type="password")
my_btn = gr.Button("Get My Piclets", variant="primary")
with gr.Column():
my_result = gr.JSON(label="My Piclets")
my_btn.click(
fn=get_user_piclets,
inputs=my_token,
outputs=my_result
)
with gr.Tab("Object Details"):
gr.Markdown("""
## ๐Ÿ” View Object Details
Get complete information about an object (canonical + all variations).
""")
with gr.Row():
with gr.Column():
obj_name = gr.Textbox(label="Object Name", placeholder="e.g., pillow, macbook")
obj_btn = gr.Button("Get Details", variant="primary")
with gr.Column():
obj_result = gr.JSON(label="Object Details")
obj_btn.click(
fn=get_object_details,
inputs=obj_name,
outputs=obj_result
)
with gr.Tab("Recent Activity"):
activity_limit = gr.Slider(5, 50, value=20, label="Number of Activities")
activity_btn = gr.Button("Get Recent Activity")
activity_result = gr.JSON(label="Recent Discoveries")
activity_btn.click(
fn=get_recent_activity,
inputs=activity_limit,
outputs=activity_result
)
with gr.Tab("Leaderboard"):
leader_limit = gr.Slider(5, 20, value=10, label="Top N Discoverers")
leader_btn = gr.Button("Get Leaderboard")
leader_result = gr.JSON(label="Top Discoverers")
leader_btn.click(
fn=get_leaderboard,
inputs=leader_limit,
outputs=leader_result
)
# API Documentation
gr.Markdown("""
## ๐Ÿ”Œ Public API Endpoints
All endpoints return JSON responses. The frontend only needs these 5 endpoints:
### 1. **generate_piclet** (Scanner)
Complete Piclet generation workflow.
- Input: `image` (File), `hf_token` (string)
- Output: Generated Piclet with discovery status
### 2. **get_user_piclets** (User Collection)
Get user's discovered Piclets and stats.
- Input: `hf_token` (string)
- Output: List of Piclets + user stats (total/unique finds, rarity score)
### 3. **get_object_details** (Object Data)
Get complete object info (canonical + all variations).
- Input: `object_name` (string)
- Output: Canonical + variations + total scans
### 4. **get_recent_activity** (Activity Feed)
Recent discoveries across all users.
- Input: `limit` (int, default 20)
- Output: Recent discoveries with timestamps
### 5. **get_leaderboard** (Top Users)
Top discoverers by rarity score.
- Input: `limit` (int, default 10)
- Output: Ranked users with stats
---
*Note: Internal helper functions (search_piclet, create_canonical, etc.) are used by generate_piclet but not exposed to frontend.*
""")
if __name__ == "__main__":
# Protect web UI with authentication while allowing API access
admin_password = os.getenv("ADMIN_PASSWORD", "changeme")
# Configure for HuggingFace Space environment
app.launch()