Spaces:
Sleeping
Sleeping
| # --- app.py (O Painel de Controle do Maestro - Produção em Lote com Diário de Bordo) --- | |
| # By Carlex & Gemini | |
| # --- Ato 1: A Convocação da Orquestra (Importações) --- | |
| import gradio as gr | |
| import torch | |
| import spaces | |
| import os | |
| import yaml | |
| from PIL import Image | |
| import shutil | |
| import gc | |
| import traceback | |
| import subprocess | |
| import math | |
| import google.generativeai as genai | |
| import numpy as np | |
| import imageio | |
| import tempfile | |
| from pathlib import Path | |
| from huggingface_hub import hf_hub_download | |
| import json | |
| from facexlib.utils.face_restoration_helper import FaceRestoreHelper | |
| import huggingface_hub | |
| import spaces | |
| import argparse | |
| import spaces | |
| import argparse | |
| import cv2 | |
| from facexlib.utils.face_restoration_helper import FaceRestoreHelper | |
| import huggingface_hub | |
| from dreamo.dreamo_pipeline import DreamOPipeline | |
| from dreamo.utils import img2tensor, resize_numpy_image_area, tensor2img, resize_numpy_image_long | |
| from tools import BEN2 | |
| # --- Músicos Originais (Sua implementação) --- | |
| from inference import create_ltx_video_pipeline, load_image_to_tensor_with_resize_and_crop, seed_everething, calculate_padding | |
| from ltx_video.pipelines.pipeline_ltx_video import ConditioningItem | |
| # --- Ato 2: A Preparação do Palco (Configurações) --- | |
| config_file_path = "configs/ltxv-13b-0.9.8-distilled.yaml" | |
| with open(config_file_path, "r") as file: | |
| PIPELINE_CONFIG_YAML = yaml.safe_load(file) | |
| # --- Constantes Globais --- | |
| LTX_REPO = "Lightricks/LTX-Video" | |
| models_dir = "downloaded_models_gradio_cpu_init" | |
| Path(models_dir).mkdir(parents=True, exist_ok=True) | |
| WORKSPACE_DIR = "aduc_workspace" | |
| GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY") | |
| # --- Carregamento de Modelos LTX na CPU --- | |
| print("Baixando e criando pipelines LTX na CPU...") | |
| distilled_model_actual_path = hf_hub_download(repo_id=LTX_REPO, filename=PIPELINE_CONFIG_YAML["checkpoint_path"], local_dir=models_dir, local_dir_use_symlinks=False) | |
| pipeline_instance = create_ltx_video_pipeline(ckpt_path=distilled_model_actual_path, precision=PIPELINE_CONFIG_YAML["precision"], text_encoder_model_name_or_path=PIPELINE_CONFIG_YAML["text_encoder_model_name_or_path"], sampler=PIPELINE_CONFIG_YAML["sampler"], device="cpu") | |
| print("Modelos LTX prontos.") | |
| # --- Ato 3: As Partituras dos Músicos (Funções) --- | |
| def get_storyboard_from_director_v2(num_fragments: int, prompt: str, initial_image_path: str, progress=gr.Progress()): | |
| progress(0.5, desc="[Diretor Gemini] Criando o storyboard completo...") | |
| if not initial_image_path: raise gr.Error("Por favor, forneça uma imagem de referência inicial.") | |
| if not GEMINI_API_KEY: raise gr.Error("Chave da API Gemini (GEMINI_API_KEY) não configurada!") | |
| genai.configure(api_key=GEMINI_API_KEY) | |
| try: | |
| with open("prompts/director_storyboard_v2.txt", "r", encoding="utf-8") as f: template = f.read() | |
| except FileNotFoundError: raise gr.Error("'prompts/director_storyboard_v2.txt' não encontrado!") | |
| director_prompt = template.format(user_prompt=prompt, num_fragments=int(num_fragments)) | |
| model = genai.GenerativeModel('gemini-2.0-flash') | |
| img = Image.open(initial_image_path) | |
| response = model.generate_content([director_prompt, img]) | |
| try: | |
| cleaned_response = response.text.strip().replace("```json", "").replace("```", "") | |
| storyboard_data = json.loads(cleaned_response) | |
| storyboard_list = storyboard_data.get("storyboard", []) | |
| if not storyboard_list: raise gr.Error("A IA não retornou um storyboard válido.") | |
| return storyboard_list | |
| except (json.JSONDecodeError, KeyError, TypeError) as e: | |
| raise gr.Error(f"O Diretor retornou uma resposta inesperada. Erro: {e}\nResposta Bruta: {response.text}") | |
| def run_ltx_animation(current_fragment_index, motion_prompt, input_frame_path, height, width, fps, seed, cfg, progress=gr.Progress()): | |
| progress(0, desc=f"[Animador LTX] Aquecendo para a Cena {current_fragment_index}...") | |
| target_device = "cuda"; output_path = os.path.join(WORKSPACE_DIR, f"fragment_{current_fragment_index}.mp4") | |
| try: | |
| pipeline_instance.to(target_device) | |
| duration_fragment, target_frames_ideal = 3.0, 3.0 * fps | |
| n_val = round((float(round(target_frames_ideal)) - 1.0) / 8.0); actual_num_frames = max(9, min(int(n_val * 8 + 1), 257)) | |
| num_frames_padded = ((actual_num_frames - 2) // 8 + 1) * 8 + 1 | |
| padded_h, padded_w = ((int(height) - 1) // 32 + 1) * 32, ((int(width) - 1) // 32 + 1) * 32 | |
| padding_vals = calculate_padding(int(height), int(width), padded_h, padded_w) | |
| timesteps = PIPELINE_CONFIG_YAML.get("first_pass", {}).get("timesteps") | |
| kwargs = {"prompt": motion_prompt, "negative_prompt": "blurry, distorted", "height": padded_h, "width": padded_w, "num_frames": num_frames_padded, "frame_rate": int(fps), "generator": torch.Generator(device=target_device).manual_seed(int(seed) + current_fragment_index), "output_type": "pt", "guidance_scale": float(cfg), "timesteps": timesteps, "vae_per_channel_normalize": True, "decode_timestep": PIPELINE_CONFIG_YAML["decode_timestep"], "decode_noise_scale": PIPELINE_CONFIG_YAML["decode_noise_scale"], "stochastic_sampling": PIPELINE_CONFIG_YAML["stochastic_sampling"], "image_cond_noise_scale": 0.15, "is_video": True, "mixed_precision": (PIPELINE_CONFIG_YAML["precision"] == "mixed_precision"), "offload_to_cpu": False, "enhance_prompt": False} | |
| media_tensor = load_image_to_tensor_with_resize_and_crop(input_frame_path, int(height), int(width)); media_tensor = torch.nn.functional.pad(media_tensor, padding_vals); kwargs["conditioning_items"] = [ConditioningItem(media_tensor.to(target_device), 0, 1.0)] | |
| result_tensor = pipeline_instance(**kwargs).images | |
| pad_l, pad_r, pad_t, pad_b = padding_vals; slice_h, slice_w = (-pad_b if pad_b > 0 else None), (-pad_r if pad_r > 0 else None) | |
| cropped_tensor = result_tensor[:, :, :actual_num_frames, pad_t:slice_h, pad_l:slice_w]; video_np = (cropped_tensor[0].permute(1, 2, 3, 0).cpu().float().numpy() * 255).astype(np.uint8) | |
| with imageio.get_writer(output_path, fps=int(fps), codec='libx264', quality=8) as writer: | |
| for i, frame in enumerate(video_np): progress(i / len(video_np), desc=f"Renderizando frame {i+1}/{len(video_np)}..."); writer.append_data(frame) | |
| return output_path | |
| finally: | |
| pipeline_instance.to("cpu"); gc.collect(); torch.cuda.empty_cache() | |
| def concatenate_masterpiece(fragment_paths: list, progress=gr.Progress()): | |
| progress(0.5, desc="Montando a obra-prima final..."); list_file_path, final_output_path = os.path.join(WORKSPACE_DIR, "concat_list.txt"), os.path.join(WORKSPACE_DIR, "obra_prima_final.mp4") | |
| with open(list_file_path, "w") as f: | |
| for path in fragment_paths: f.write(f"file '{os.path.abspath(path)}'\n") | |
| command = f"ffmpeg -y -f concat -safe 0 -i {list_file_path} -c copy {final_output_path}" | |
| try: | |
| subprocess.run(command, shell=True, check=True, capture_output=True, text=True); return final_output_path | |
| except subprocess.CalledProcessError as e: | |
| raise gr.Error(f"FFmpeg falhou ao unir os vídeos: {e.stderr}") | |
| def run_full_production(storyboard, ref_img_path, height, width, fps, seed, cfg): | |
| if not storyboard: raise gr.Error("Nenhum roteiro para produzir.") | |
| if not ref_img_path: raise gr.Error("Nenhuma imagem de referência definida.") | |
| video_fragments, log_history = [], "" | |
| for i, motion_prompt in enumerate(storyboard): | |
| log_message = f"Iniciando produção da Cena {i+1}/{len(storyboard)}..." | |
| log_history += log_message + "\n" | |
| yield {production_log_output: gr.update(value=log_history)} | |
| fragment_path = run_ltx_animation(i + 1, motion_prompt, ref_img_path, height, width, fps, seed, cfg, gr.Progress()) | |
| video_fragments.append(fragment_path) | |
| log_message = f"Cena {i+1} concluída e salva em {os.path.basename(fragment_path)}." | |
| log_history += log_message + "\n" | |
| yield {production_log_output: gr.update(value=log_history), fragment_gallery_output: gr.update(value=video_fragments), fragment_list_state: video_fragments, final_fragments_display: gr.update(value=video_fragments)} | |
| log_history += "\nProdução de todas as cenas concluída!" | |
| yield {production_log_output: gr.update(value=log_history)} | |
| # --- Ato 4: A Apresentação (UI do Gradio) --- | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# LTX Video - Storyboard em Vídeo (ADUC-SDR)\n*By Carlex & Gemini*") | |
| storyboard_state = gr.State([]) | |
| reference_image_state = gr.State("") | |
| fragment_list_state = gr.State([]) | |
| if os.path.exists(WORKSPACE_DIR): shutil.rmtree(WORKSPACE_DIR) | |
| os.makedirs(WORKSPACE_DIR) | |
| with gr.Tabs(): | |
| with gr.TabItem("ETAPA 1: O DIRETOR (Roteiro Visual)"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| num_fragments_input = gr.Slider(2, 10, 4, step=1, label="Número de Cenas (Fragmentos)") | |
| prompt_input = gr.Textbox(label="Ideia Geral (Prompt)") | |
| image_input = gr.Image(type="filepath", label="Imagem de Referência") | |
| director_button = gr.Button("▶️ Gerar Roteiro Visual (Gemini)", variant="primary") | |
| with gr.Column(): | |
| storyboard_output = gr.JSON(label="Roteiro Visual Gerado (Storyboard)") | |
| with gr.TabItem("ETAPA 2: A PRODUÇÃO (Gerar Cenas em Vídeo)"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| storyboard_to_render = gr.JSON(label="Roteiro a ser Produzido") | |
| animator_button = gr.Button("▶️ Produzir TODAS as Cenas (LTX)", variant="primary") | |
| production_log_output = gr.Textbox(label="Diário de Bordo da Produção", lines=5, interactive=False, placeholder="Aguardando início da produção...") | |
| with gr.Column(): | |
| fragment_gallery_output = gr.Gallery(label="Cenas Produzidas (Fragmentos de Vídeo)", object_fit="contain", height="auto") | |
| with gr.Row(): | |
| height_slider = gr.Slider(256, 1024, 512, step=32, label="Altura") | |
| width_slider = gr.Slider(256, 1024, 512, step=32, label="Largura") | |
| with gr.Row(): | |
| fps_slider = gr.Slider(8, 24, 15, step=1, label="FPS") | |
| seed_number = gr.Number(42, label="Seed") | |
| cfg_slider = gr.Slider(1.0, 10.0, 2.5, step=0.1, label="CFG") | |
| with gr.TabItem("ETAPA 3: PÓS-PRODUÇÃO"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| final_fragments_display = gr.JSON(label="Vídeos a Concatenar") | |
| editor_button = gr.Button("▶️ Concatenar Tudo (FFmpeg)", variant="primary") | |
| with gr.Column(): | |
| final_video_output = gr.Video(label="A Obra-Prima Final") | |
| # --- Ato 5: A Regência (Lógica de Conexão dos Botões) --- | |
| def director_success(img_path, storyboard_json): | |
| if not img_path: raise gr.Error("A imagem de referência é necessária.") | |
| storyboard_list = storyboard_json if isinstance(storyboard_json, list) else storyboard_json.get("storyboard", []) | |
| if not storyboard_list: raise gr.Error("O storyboard está vazio.") | |
| return storyboard_list, img_path, gr.update(value=storyboard_json) | |
| director_button.click( | |
| fn=get_storyboard_from_director_v2, | |
| inputs=[num_fragments_input, prompt_input, image_input], | |
| outputs=[storyboard_output] | |
| ).success( | |
| fn=director_success, | |
| inputs=[image_input, storyboard_output], | |
| outputs=[storyboard_state, reference_image_state, storyboard_to_render] | |
| ) | |
| animator_button.click( | |
| fn=run_full_production, | |
| inputs=[storyboard_state, reference_image_state, height_slider, width_slider, fps_slider, seed_number, cfg_slider], | |
| outputs=[production_log_output, fragment_gallery_output, fragment_list_state, final_fragments_display] | |
| ) | |
| editor_button.click( | |
| fn=concatenate_masterpiece, | |
| inputs=[fragment_list_state], | |
| outputs=[final_video_output] | |
| ) | |
| if __name__ == "__main__": | |
| demo.queue().launch(server_name="0.0.0.0", share=True) |