Spaces:
Running on Zero
Running on Zero
| import os | |
| import subprocess | |
| import sys | |
| # Disable torch.compile / dynamo before any torch import | |
| os.environ["TORCH_COMPILE_DISABLE"] = "1" | |
| os.environ["TORCHDYNAMO_DISABLE"] = "1" | |
| # Clone LTX-2 repo and install packages | |
| LTX_REPO_URL = "https://github.com/Lightricks/LTX-2.git" | |
| LTX_REPO_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "LTX-2") | |
| if not os.path.exists(LTX_REPO_DIR): | |
| print(f"Cloning {LTX_REPO_URL}...") | |
| subprocess.run(["git", "clone", "--depth", "1", LTX_REPO_URL, LTX_REPO_DIR], check=True) | |
| print("Installing ltx-core and ltx-pipelines from cloned repo...") | |
| subprocess.run( | |
| [sys.executable, "-m", "pip", "install", "--force-reinstall", "--no-deps", "-e", | |
| os.path.join(LTX_REPO_DIR, "packages", "ltx-core"), | |
| "-e", os.path.join(LTX_REPO_DIR, "packages", "ltx-pipelines")], | |
| check=True, | |
| ) | |
| sys.path.insert(0, os.path.join(LTX_REPO_DIR, "packages", "ltx-pipelines", "src")) | |
| sys.path.insert(0, os.path.join(LTX_REPO_DIR, "packages", "ltx-core", "src")) | |
| import logging | |
| import random | |
| import tempfile | |
| from pathlib import Path | |
| import torch | |
| torch._dynamo.config.suppress_errors = True | |
| torch._dynamo.config.disable = True | |
| import spaces | |
| import gradio as gr | |
| import numpy as np | |
| from gradio_client import Client, handle_file | |
| from huggingface_hub import hf_hub_download | |
| from ltx_core.model.video_vae import TilingConfig, get_video_chunks_number | |
| from ltx_core.quantization import QuantizationPolicy | |
| from ltx_core.text_encoders.gemma.embeddings_processor import EmbeddingsProcessorOutput | |
| from ltx_pipelines.distilled import DistilledPipeline | |
| from ltx_pipelines.utils import helpers as pipeline_helpers | |
| from ltx_pipelines.utils.args import ImageConditioningInput | |
| from ltx_pipelines.utils.media_io import encode_video | |
| logging.getLogger().setLevel(logging.INFO) | |
| MAX_SEED = np.iinfo(np.int32).max | |
| DEFAULT_PROMPT = ( | |
| "An astronaut hatches from a fragile egg on the surface of the Moon, " | |
| "the shell cracking and peeling apart in gentle low-gravity motion. " | |
| "Fine lunar dust lifts and drifts outward with each movement, floating " | |
| "in slow arcs before settling back onto the ground. The astronaut pushes " | |
| "free in a deliberate, weightless motion, small fragments of the egg " | |
| "tumbling and spinning through the air." | |
| ) | |
| DEFAULT_HEIGHT = 1024 | |
| DEFAULT_WIDTH = 1536 | |
| DEFAULT_FRAME_RATE = 24.0 | |
| # Model repo | |
| LTX_MODEL_REPO = "diffusers-internal-dev/ltx-23" | |
| # Text encoder space URL - must be a 2.3-compatible text encoder | |
| TEXT_ENCODER_SPACE = "multimodalart/gemma-text-encoder-ltx23" | |
| # Download model checkpoints | |
| print("=" * 80) | |
| print("Downloading LTX-2.3 distilled model...") | |
| print("=" * 80) | |
| checkpoint_path = hf_hub_download(repo_id=LTX_MODEL_REPO, filename="ltx-2.3-22b-distilled.safetensors") | |
| spatial_upsampler_path = hf_hub_download(repo_id=LTX_MODEL_REPO, filename="ltx-2.3-spatial-upscaler-x2-1.0.safetensors") | |
| print(f"Checkpoint: {checkpoint_path}") | |
| print(f"Spatial upsampler: {spatial_upsampler_path}") | |
| # Initialize pipeline WITHOUT text encoder (gemma_root=None) | |
| # Text encoding will be done by external space | |
| pipeline = DistilledPipeline( | |
| distilled_checkpoint_path=checkpoint_path, | |
| spatial_upsampler_path=spatial_upsampler_path, | |
| gemma_root=None, | |
| loras=[], | |
| quantization=QuantizationPolicy.fp8_cast(), | |
| ) | |
| # Preload all models so first request is fast. | |
| # On ZeroGPU, .to('cuda') is intercepted and actual GPU allocation | |
| # happens inside the @spaces.GPU decorated function. | |
| print("Preloading models...") | |
| ledger = pipeline.model_ledger | |
| _transformer = ledger.transformer() | |
| _video_encoder = ledger.video_encoder() | |
| _video_decoder = ledger.video_decoder() | |
| _audio_decoder = ledger.audio_decoder() | |
| _vocoder = ledger.vocoder() | |
| _spatial_upsampler = ledger.spatial_upsampler() | |
| ledger.transformer = lambda: _transformer | |
| ledger.video_encoder = lambda: _video_encoder | |
| ledger.video_decoder = lambda: _video_decoder | |
| ledger.audio_decoder = lambda: _audio_decoder | |
| ledger.vocoder = lambda: _vocoder | |
| ledger.spatial_upsampler = lambda: _spatial_upsampler | |
| print("All models preloaded!") | |
| # Connect to text encoder space | |
| print(f"Connecting to text encoder space: {TEXT_ENCODER_SPACE}") | |
| try: | |
| text_encoder_client = Client(TEXT_ENCODER_SPACE) | |
| print("Text encoder client connected!") | |
| except Exception as e: | |
| print(f"Warning: Could not connect to text encoder space: {e}") | |
| text_encoder_client = None | |
| print("=" * 80) | |
| print("Pipeline ready!") | |
| print("=" * 80) | |
| def generate_video( | |
| input_image, | |
| prompt: str, | |
| duration: float, | |
| enhance_prompt: bool = True, | |
| seed: int = 42, | |
| randomize_seed: bool = True, | |
| height: int = DEFAULT_HEIGHT, | |
| width: int = DEFAULT_WIDTH, | |
| progress=gr.Progress(track_tqdm=True), | |
| ): | |
| """Generate a video based on the given parameters.""" | |
| try: | |
| current_seed = random.randint(0, MAX_SEED) if randomize_seed else int(seed) | |
| frame_rate = DEFAULT_FRAME_RATE | |
| num_frames = int(duration * frame_rate) + 1 | |
| # 8k+1 format | |
| num_frames = ((num_frames - 1 + 7) // 8) * 8 + 1 | |
| # Handle image input | |
| images = [] | |
| temp_image_path = None | |
| if input_image is not None: | |
| output_dir = Path("outputs") | |
| output_dir.mkdir(exist_ok=True) | |
| temp_image_path = output_dir / f"temp_input_{current_seed}.jpg" | |
| if hasattr(input_image, "save"): | |
| input_image.save(temp_image_path) | |
| else: | |
| temp_image_path = Path(input_image) | |
| images = [ImageConditioningInput(path=str(temp_image_path), frame_idx=0, strength=1.0)] | |
| # Get embeddings from text encoder space | |
| print(f"Encoding prompt: {prompt}") | |
| if text_encoder_client is None: | |
| raise RuntimeError( | |
| f"Text encoder client not connected. Please ensure the text encoder space " | |
| f"({TEXT_ENCODER_SPACE}) is running and accessible." | |
| ) | |
| try: | |
| image_input = None | |
| if temp_image_path is not None: | |
| image_input = handle_file(str(temp_image_path)) | |
| result = text_encoder_client.predict( | |
| prompt=prompt, | |
| enhance_prompt=enhance_prompt, | |
| input_image=image_input, | |
| seed=current_seed, | |
| negative_prompt="", | |
| api_name="/encode_prompt", | |
| ) | |
| embedding_path = result[0] | |
| print(f"Embeddings received from: {embedding_path}") | |
| embeddings = torch.load(embedding_path) | |
| video_context = embeddings["video_context"].to("cuda") | |
| audio_context = embeddings["audio_context"] | |
| if audio_context is not None: | |
| audio_context = audio_context.to("cuda") | |
| print("Embeddings loaded successfully") | |
| except Exception as e: | |
| raise RuntimeError( | |
| f"Failed to get embeddings from text encoder space: {e}\n" | |
| f"Please ensure {TEXT_ENCODER_SPACE} is running properly." | |
| ) | |
| # Monkey-patch encode_prompts to return pre-computed embeddings | |
| # instead of loading the text encoder + embeddings processor | |
| precomputed = EmbeddingsProcessorOutput( | |
| video_encoding=video_context, | |
| audio_encoding=audio_context, | |
| attention_mask=torch.ones(1, device="cuda"), # dummy mask | |
| ) | |
| original_encode_prompts = pipeline_helpers.encode_prompts | |
| pipeline_helpers.encode_prompts = lambda *args, **kwargs: [precomputed] | |
| try: | |
| tiling_config = TilingConfig.default() | |
| video_chunks_number = get_video_chunks_number(num_frames, tiling_config) | |
| video, audio = pipeline( | |
| prompt=prompt, | |
| seed=current_seed, | |
| height=height, | |
| width=width, | |
| num_frames=num_frames, | |
| frame_rate=frame_rate, | |
| images=images, | |
| tiling_config=tiling_config, | |
| enhance_prompt=False, # Already enhanced by text encoder space | |
| ) | |
| output_path = tempfile.mktemp(suffix=".mp4") | |
| encode_video( | |
| video=video, | |
| fps=frame_rate, | |
| audio=audio, | |
| output_path=output_path, | |
| video_chunks_number=video_chunks_number, | |
| ) | |
| return str(output_path), current_seed | |
| finally: | |
| # Restore original encode_prompts | |
| pipeline_helpers.encode_prompts = original_encode_prompts | |
| except Exception as e: | |
| import traceback | |
| error_msg = f"Error: {str(e)}\n{traceback.format_exc()}" | |
| print(error_msg) | |
| return None, current_seed | |
| with gr.Blocks(title="LTX-2.3 Distilled") as demo: | |
| gr.Markdown("# LTX-2.3 Distilled (22B): Fast Audio-Video Generation") | |
| gr.Markdown( | |
| "Fast video + audio generation using the distilled model (8 steps stage 1, 4 steps stage 2). " | |
| "[[model]](https://huggingface.co/Lightricks/LTX-2) " | |
| "[[code]](https://github.com/Lightricks/LTX-2)" | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| input_image = gr.Image(label="Input Image (Optional)", type="pil") | |
| prompt = gr.Textbox( | |
| label="Prompt", | |
| info="for best results - make it as elaborate as possible", | |
| value="Make this image come alive with cinematic motion, smooth animation", | |
| lines=3, | |
| placeholder="Describe the motion and animation you want...", | |
| ) | |
| with gr.Row(): | |
| duration = gr.Slider(label="Duration (seconds)", minimum=1.0, maximum=10.0, value=3.0, step=0.1) | |
| enhance_prompt = gr.Checkbox(label="Enhance Prompt", value=True) | |
| generate_btn = gr.Button("Generate Video", variant="primary", size="lg") | |
| with gr.Accordion("Advanced Settings", open=False): | |
| seed = gr.Slider(label="Seed", minimum=0, maximum=MAX_SEED, value=10, step=1) | |
| randomize_seed = gr.Checkbox(label="Randomize Seed", value=True) | |
| with gr.Row(): | |
| width = gr.Number(label="Width", value=DEFAULT_WIDTH, precision=0) | |
| height = gr.Number(label="Height", value=DEFAULT_HEIGHT, precision=0) | |
| with gr.Column(): | |
| output_video = gr.Video(label="Generated Video", autoplay=True) | |
| generate_btn.click( | |
| fn=generate_video, | |
| inputs=[ | |
| input_image, prompt, duration, enhance_prompt, | |
| seed, randomize_seed, height, width, | |
| ], | |
| outputs=[output_video, seed], | |
| ) | |
| css = """ | |
| .gradio-container .contain{max-width: 1200px !important; margin: 0 auto !important} | |
| """ | |
| if __name__ == "__main__": | |
| demo.launch(theme=gr.themes.Citrus(), css=css) | |