import os import sys # Set critical environment variables first os.environ["PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION"] = "python" os.environ["WATCHDOG_OPTIONAL"] = "1" os.environ["PYTORCH_JIT"] = "0" os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1" # Use Hugging Face optimized file transfer # Import third party modules import streamlit as st import numpy as np import random from PIL import Image import io import time # Set up imports for huggingface_hub # Import what we can, but handle potential import errors try: from huggingface_hub import HfApi, HfFolder, Repository, create_repo, model_info, hf_hub_download from huggingface_hub import login as hf_login except ImportError as e: st.error(f"Error importing from huggingface_hub: {e}") # Don't rely on specific internal modules that might change between versions # We'll handle errors more gracefully instead # Configure Hugging Face cache and environment os.environ["HF_HOME"] = os.path.join(os.getcwd(), ".cache/huggingface") # Import PyTorch after environment setup import torch from diffusers import FluxFillPipeline # Configure a custom download callback class DownloadProgressCallback: def __init__(self, progress_placeholder): self.progress_placeholder = progress_placeholder self.last_update = time.time() self.downloaded = 0 self.total = 0 def __call__(self, downloaded, total): self.downloaded = downloaded self.total = total # Limit updates to avoid too much UI refresh (every 0.5 seconds) current_time = time.time() if current_time - self.last_update > 0.5: self.last_update = current_time if total > 0: progress_pct = (downloaded / total) * 100 self.progress_placeholder.text( f"Downloading component: {progress_pct:.1f}% " f"({downloaded / 1024 / 1024:.1f}MB / {total / 1024 / 1024:.1f}MB)" ) else: self.progress_placeholder.text( f"Downloading: {downloaded / 1024 / 1024:.1f}MB downloaded" ) # Constants MAX_SEED = np.iinfo(np.int32).max MAX_IMAGE_SIZE = 2048 # Setting page config st.set_page_config( page_title="FLUX.1 Fill [dev]", layout="wide" ) # Title and description st.markdown(""" # FLUX.1 Fill [dev] 12B param rectified flow transformer structural conditioning tuned, guidance-distilled from [FLUX.1 [pro]](https://blackforestlabs.ai/) [[non-commercial license](https://huggingface.co/black-forest-labs/FLUX.1-dev/blob/main/LICENSE.md)] [[blog](https://blackforestlabs.ai/announcing-black-forest-labs/)] [[model](https://huggingface.co/black-forest-labs/FLUX.1-dev)] """) # Add simple instructions st.sidebar.markdown(""" ## Important Setup Information This app uses the FLUX.1-Fill-dev model which requires special access: 1. Sign up/login at [Hugging Face](https://huggingface.co/) 2. Request access to [FLUX.1-Fill-dev](https://huggingface.co/black-forest-labs/FLUX.1-Fill-dev) by clicking 'Access repository' 3. Wait for approval from model owners ### For Hugging Face Spaces Setup: 1. Go to your Space settings > Secrets 2. Add a new secret with the name `HF_TOKEN` 3. Set its value to your Hugging Face API token (found in your account settings) """) # Try to get a Hugging Face token from environment variables def get_hf_token(): # Check common environment variable names for HF tokens token_env_vars = [ 'HF_TOKEN', 'HUGGINGFACE_TOKEN', 'HUGGING_FACE_HUB_TOKEN', 'HF_API_TOKEN', 'HUGGINGFACE_API_TOKEN', 'HUGGINGFACE_HUB_TOKEN' ] for env_var in token_env_vars: if env_var in os.environ and os.environ[env_var].strip(): st.sidebar.success(f"Found token in {env_var}") return os.environ[env_var].strip() # If we're here, no token was found st.sidebar.warning("No Hugging Face token found in environment variables") st.sidebar.info("Checking for cached credentials...") # Check if there's a cached token try: from huggingface_hub.constants import HF_TOKEN_PATH if os.path.exists(HF_TOKEN_PATH): with open(HF_TOKEN_PATH, "r") as f: token = f.read().strip() if token: st.sidebar.success("Found cached Hugging Face token") return token except Exception as e: st.sidebar.error(f"Error checking cached token: {str(e)}") return None @st.cache_resource(show_spinner=False) def load_model(): """Load the model using the Hugging Face CLI login approach""" # Start with a pre-check of the model st.sidebar.subheader("Model Loading Status") model_status = st.sidebar.empty() model_status.info("Preparing to load model...") # Get device - prefer CUDA if available device = "cuda" if torch.cuda.is_available() else "cpu" device_status = st.sidebar.empty() device_status.info(f"Using device: {device}") # Pre-check model availability try: # Try to get token first token = get_hf_token() if token: # Check if the model is available in Hugging Face try: model_status.info("Checking model access...") # Use safer HfApi instead of direct model_info if possible api = HfApi() try: info = api.model_info("black-forest-labs/FLUX.1-Fill-dev") model_status.success(f"Model verified: {info.id}") if hasattr(info, 'tags'): model_status.info(f"Model tags: {', '.join(info.tags)}") except Exception as api_error: # Try fallback with model_info try: info = model_info("black-forest-labs/FLUX.1-Fill-dev", token=token) model_status.success(f"Model verified: {info.id}") if hasattr(info, 'tags'): model_status.info(f"Model tags: {', '.join(info.tags)}") except Exception: raise api_error except Exception as access_error: model_status.error(f"Model access check failed: {str(access_error)}") else: model_status.warning("No Hugging Face token found - model may not load") except Exception as precheck_error: model_status.error(f"Model pre-check failed: {str(precheck_error)}") # Ignore the tokenizer warnings about slow tokenizers import transformers transformers.logging.set_verbosity_error() try: # Check for token in environment token = get_hf_token() st.info("Loading FLUX.1-Fill-dev model...") st.info(f"Token available: {'Yes' if token else 'No'}") try: # Try loading with token if available if token: st.info("Attempting to load model with auth token...") # Use low_cpu_mem_usage and offload to disk to avoid memory issues progress_placeholder = st.empty() progress_placeholder.text("Starting model loading...") # Try component by component loading with custom progress tracking try: from diffusers import AutoPipelineForInpainting progress_placeholder.text("Loading components (1/4): Starting...") # Create progress callback for downloads download_progress = st.empty() progress_callback = DownloadProgressCallback(download_progress) # Try pre-downloading files to ensure all components are available progress_placeholder.text("Starting model files download...") try: # Try to download individual files one by one instead of snapshot # This is more robust across different huggingface_hub versions progress_placeholder.text("Downloading model files using hf_hub_download...") # List of essential files to try downloading essential_files = [ "config.json", "scheduler/scheduler_config.json", "tokenizer/tokenizer_config.json" ] # Try downloading files one by one for file_path in essential_files: try: progress_placeholder.text(f"Downloading {file_path}...") hf_hub_download( repo_id="black-forest-labs/FLUX.1-Fill-dev", filename=file_path, token=token, local_dir="model_cache", resume_download=True ) except Exception as file_error: progress_placeholder.text(f"Error downloading {file_path}: {str(file_error)}") # Continue with the next file download_progress.text("Download completed successfully!") except Exception as download_error: download_progress.text(f"Snapshot download error: {str(download_error)}") # This is ok, we'll let the pipeline handle it # Now use the downloaded files (or let pipeline download if needed) pipeline_kwargs = { "torch_dtype": torch.float16 if torch.cuda.is_available() else torch.float32, "token": token, "device_map": "auto", # Enable automatic device mapping "low_cpu_mem_usage": True, "use_safetensors": True, "variant": "fp16" if torch.cuda.is_available() else None, "cache_dir": "model_cache" # Use our download cache } # Load the pipeline component by component to track progress progress_placeholder.text("Loading components (2/4): Loading configuration...") model = AutoPipelineForInpainting.from_pretrained( "black-forest-labs/FLUX.1-Fill-dev", **pipeline_kwargs ) # If we get here, we successfully loaded progress_placeholder.text("Loading components (4/4): Successfully loaded all components!") except Exception as auto_error: progress_placeholder.text(f"AutoPipeline approach failed, trying direct pipeline loading: {str(auto_error)}") # Try different parameter combinations try: # Try the modern token parameter first model = FluxFillPipeline.from_pretrained( "black-forest-labs/FLUX.1-Fill-dev", torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, token=token, device_map="auto", low_cpu_mem_usage=True, use_safetensors=True, variant="fp16" if torch.cuda.is_available() else None ) except Exception as token_error: progress_placeholder.text(f"Trying with use_auth_token instead of token: {str(token_error)}") # Fall back to older parameter name model = FluxFillPipeline.from_pretrained( "black-forest-labs/FLUX.1-Fill-dev", torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, use_auth_token=token, device_map="auto", low_cpu_mem_usage=True, use_safetensors=True, variant="fp16" if torch.cuda.is_available() else None ) else: st.info("Attempting to load model without auth token (not recommended)...") progress_placeholder = st.empty() progress_placeholder.text("Starting model loading without token...") try: from diffusers import AutoPipelineForInpainting progress_placeholder.text("Loading components without token (1/4): Starting...") # Try using AutoPipelineForInpainting first with optimizations model = AutoPipelineForInpainting.from_pretrained( "black-forest-labs/FLUX.1-Fill-dev", torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, device_map="auto", low_cpu_mem_usage=True, use_safetensors=True, variant="fp16" if torch.cuda.is_available() else None ) progress_placeholder.text("Loading components without token (4/4): Success!") except Exception as auto_error: progress_placeholder.text(f"AutoPipeline without token failed: {str(auto_error)}") # Fallback to regular pipeline model = FluxFillPipeline.from_pretrained( "black-forest-labs/FLUX.1-Fill-dev", torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, device_map="auto", low_cpu_mem_usage=True, use_safetensors=True, variant="fp16" if torch.cuda.is_available() else None ) st.success("Model loaded successfully!") return model.to(device) except Exception as detailed_error: st.error(f"Detailed loading error: {str(detailed_error)}") # Try component-by-component loading as a final fallback st.info("Trying component-by-component loading as a final fallback...") comp_progress = st.empty() comp_progress.text("Starting component-by-component loading...") try: import signal from huggingface_hub import login, HfApi from contextlib import contextmanager # Define a timeout context manager @contextmanager def timeout(time_seconds): def _raise_timeout_error(signum, frame): raise TimeoutError(f"Function timed out after {time_seconds} seconds") # Only use signal on Unix-based systems if hasattr(signal, "SIGALRM"): signal.signal(signal.SIGALRM, _raise_timeout_error) signal.alarm(time_seconds) try: yield finally: signal.alarm(0) else: # On Windows just yield without timeout yield if token: # First log in explicitly comp_progress.text("Logging in with token...") try: hf_login(token) except Exception as login_error: comp_progress.error(f"Login error (non-critical): {str(login_error)}") # Continue anyway as the token parameter will be used # Verify login worked and we have access comp_progress.text("Verifying model access...") api = HfApi() try: # Check if we can access the model with timeout(30): # 30 seconds timeout for API check model_info_obj = api.model_info("black-forest-labs/FLUX.1-Fill-dev") comp_progress.text(f"Access verified to model: {model_info_obj.id if hasattr(model_info_obj, 'id') else 'black-forest-labs/FLUX.1-Fill-dev'}") except Exception as access_error: st.error(f"Authentication succeeded but access verification failed: {str(access_error)}") if "401" in str(access_error) or "access" in str(access_error).lower(): st.error("You need to request access to this model and wait for approval") # Try component-by-component loading approach from diffusers import ( UNet2DConditionModel, DDPMScheduler, AutoencoderKL, ControlNetModel ) from transformers import CLIPTextModel, CLIPTokenizer # Build a pipeline from individual components comp_progress.text("Loading individual components (1/6): Configuration") comp_progress.text("Loading individual components (2/6): Tokenizer") try: tokenizer = CLIPTokenizer.from_pretrained( "black-forest-labs/FLUX.1-Fill-dev", subfolder="tokenizer", token=token, cache_dir="model_cache" ) comp_progress.text("Loading individual components (3/6): Text Encoder") text_encoder = CLIPTextModel.from_pretrained( "black-forest-labs/FLUX.1-Fill-dev", subfolder="text_encoder", token=token, cache_dir="model_cache", torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32 ) comp_progress.text("Loading individual components (4/6): VAE") vae = AutoencoderKL.from_pretrained( "black-forest-labs/FLUX.1-Fill-dev", subfolder="vae", token=token, cache_dir="model_cache", torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32 ) comp_progress.text("Loading individual components (5/6): UNet") unet = UNet2DConditionModel.from_pretrained( "black-forest-labs/FLUX.1-Fill-dev", subfolder="unet", token=token, cache_dir="model_cache", torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32 ) comp_progress.text("Loading individual components (6/6): Scheduler") scheduler = DDPMScheduler.from_pretrained( "black-forest-labs/FLUX.1-Fill-dev", subfolder="scheduler", token=token, cache_dir="model_cache" ) # Now assemble the pipeline comp_progress.text("Assembling pipeline from components...") model = FluxFillPipeline( vae=vae, text_encoder=text_encoder, tokenizer=tokenizer, unet=unet, scheduler=scheduler, ) comp_progress.success("Successfully assembled pipeline from components!") except Exception as component_error: comp_progress.error(f"Component loading failed: {str(component_error)}") # Try one last approach with the auto pipeline comp_progress.text("Trying AutoPipelineForInpainting as last resort...") from diffusers import AutoPipelineForInpainting model = AutoPipelineForInpainting.from_pretrained( "black-forest-labs/FLUX.1-Fill-dev", torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, token=token if token else None, device_map="auto", low_cpu_mem_usage=True, use_safetensors=True, offload_folder="tmp_offload", # Offload to disk if needed variant="fp16" if torch.cuda.is_available() else None ) st.success("Successfully loaded model with alternative approach!") comp_progress.empty() except Exception as alt_error: st.error(f"Alternative loading approach failed: {str(alt_error)}") raise return model.to(device) except Exception as e: st.error(f"Error loading model: {str(e)}") if "401 Client Error" in str(e) or "401" in str(e) or "access" in str(e).lower() or "denied" in str(e).lower(): st.error(""" Access Denied: You need to: 1. Request access to the model at https://huggingface.co/black-forest-labs/FLUX.1-Fill-dev 2. Set up your Hugging Face token in Spaces: - Go to your Space settings > Secrets - Add a new secret with name 'HF_TOKEN' - Set its value to your Hugging Face API token 3. Wait for approval from model owners Note: You can find your token at https://huggingface.co/settings/tokens """) elif "Tried to instantiate class" in str(e): st.error(""" PyTorch class initialization error. Try restarting the app. If the error persists, try accessing the app from a different browser. """) st.stop() # Initialize model section with st.spinner("Loading model..."): try: pipe = load_model() st.success("Model loaded successfully!") except Exception as e: st.error(f"Failed to load model: {str(e)}") st.stop() def calculate_optimal_dimensions(image: Image.Image): # Extract the original dimensions original_width, original_height = image.size # Set constants MIN_ASPECT_RATIO = 9 / 16 MAX_ASPECT_RATIO = 16 / 9 FIXED_DIMENSION = 1024 # Calculate the aspect ratio of the original image original_aspect_ratio = original_width / original_height # Determine which dimension to fix if original_aspect_ratio > 1: # Wider than tall width = FIXED_DIMENSION height = round(FIXED_DIMENSION / original_aspect_ratio) else: # Taller than wide height = FIXED_DIMENSION width = round(FIXED_DIMENSION * original_aspect_ratio) # Ensure dimensions are multiples of 8 width = (width // 8) * 8 height = (height // 8) * 8 # Enforce aspect ratio limits calculated_aspect_ratio = width / height if calculated_aspect_ratio > MAX_ASPECT_RATIO: width = (height * MAX_ASPECT_RATIO // 8) * 8 elif calculated_aspect_ratio < MIN_ASPECT_RATIO: height = (width / MIN_ASPECT_RATIO // 8) * 8 # Ensure width and height remain above the minimum dimensions width = max(width, 576) if width == FIXED_DIMENSION else width height = max(height, 576) if height == FIXED_DIMENSION else height return width, height # Create two columns for layout col1, col2 = st.columns([1, 1]) with col1: # Upload image uploaded_file = st.file_uploader("Upload an image for inpainting", type=["jpg", "jpeg", "png"]) if uploaded_file is not None: # Display the uploaded image image = Image.open(uploaded_file).convert("RGB") st.image(image, caption="Uploaded Image", use_container_width=True) # Simple approach to create a mask - select a square area st.write("Select an area to inpaint:") # Get image dimensions img_width, img_height = image.size # Scale for display while maintaining aspect ratio display_height = 600 display_width = int(img_width * (display_height / img_height)) # Create sliders for selecting the area col_sliders1, col_sliders2 = st.columns(2) with col_sliders1: x1 = st.slider("Left edge (X1)", 0, img_width, img_width // 4) y1 = st.slider("Top edge (Y1)", 0, img_height, img_height // 4) with col_sliders2: x2 = st.slider("Right edge (X2)", x1, img_width, min(x1 + img_width // 2, img_width)) y2 = st.slider("Bottom edge (Y2)", y1, img_height, min(y1 + img_height // 2, img_height)) # Create a copy of the image to show the mask preview_img = image.copy() preview_mask = Image.new("L", image.size, 0) # Draw a white rectangle on the mask from PIL import ImageDraw draw = ImageDraw.Draw(preview_mask) draw.rectangle([(x1, y1), (x2, y2)], fill=255) # Show the mask on the image masked_preview = image.copy() # Add semi-transparent white overlay overlay = Image.new("RGBA", image.size, (255, 255, 255, 128)) masked_preview.paste(overlay, (0, 0), preview_mask) st.image(masked_preview, caption="Area to inpaint (white overlay)", use_container_width=True) # Prompt input prompt = st.text_input("Enter your prompt") # Example prompts examples = [ "a tiny astronaut hatching from an egg on the moon", "a cat holding a sign that says hello world", "an anime illustration of a wiener schnitzel", ] example_prompt = st.selectbox("Or select an example prompt", [""] + examples) if example_prompt and not prompt: prompt = example_prompt # Advanced settings with expander with st.expander("Advanced Settings"): randomize_seed = st.checkbox("Randomize seed", value=True) if not randomize_seed: seed = st.slider("Seed", 0, MAX_SEED, 0) else: seed = random.randint(0, MAX_SEED) guidance_scale = st.slider("Guidance Scale", 1.0, 30.0, 3.5, 0.5) num_inference_steps = st.slider("Number of inference steps", 1, 50, 28) # Run button run_button = st.button("Generate") with col2: if uploaded_file is not None: st.write("Result will appear here") if run_button and prompt: with st.spinner("Generating image..."): # Create mask from rectangle coordinates mask = Image.new("L", image.size, 0) draw = ImageDraw.Draw(mask) draw.rectangle([(x1, y1), (x2, y2)], fill=255) # Calculate dimensions for generation width, height = calculate_optimal_dimensions(image) # Progress bar progress_bar = st.progress(0) # Generate the image try: # Set up progress bar updates progress_text = st.empty() debug_info = st.empty() # Show parameters for debugging debug_info.info(f"Model type: {pipe.__class__.__name__}") # Update progress progress_bar.progress(0.1) progress_text.text("Preparing image and mask...") # Make sure mask is in the right format # Some models require masks where white (255) is the area to inpaint mask_img = mask.convert("L") # Prepare arguments - different models may have different parameter names model_class_name = pipe.__class__.__name__ # Common parameters for all models common_params = { "prompt": prompt, "image": image, "mask_image": mask_img, "num_inference_steps": num_inference_steps, "generator": torch.Generator("cpu").manual_seed(seed) } # Add parameters for Flux model common_params["guidance_scale"] = guidance_scale # Try running generation with dimensions try: progress_text.text("Running generation...") progress_bar.progress(0.2) # First try with dimensions common_params["height"] = int(height) common_params["width"] = int(width) result = pipe(**common_params) except Exception as e: debug_info.warning(f"First attempt failed: {str(e)}") progress_text.text("Retrying with adjusted parameters...") # Remove dimensions and try again del common_params["height"] del common_params["width"] result = pipe(**common_params) # Get the result image result_image = result.images[0] # Update final progress progress_bar.progress(1.0) progress_text.text("Complete!") debug_info.empty() # Clear debug info # Display the result st.image(result_image, caption="Generated Result", use_column_width=True) # Add download button buf = io.BytesIO() result_image.save(buf, format="PNG") st.download_button( label="Download result", data=buf.getvalue(), file_name="inpaint_result.png", mime="image/png", ) # Display used seed st.write(f"Seed used: {seed}") except Exception as e: st.error(f"An error occurred during generation: {str(e)}") st.error("Try adjusting the parameters or using a different image.") # If no image is uploaded if uploaded_file is None: with col2: st.write("Please upload an image first")