Spaces:
Sleeping
Sleeping
| # --- app.py (O Painel de Controle do Maestro - Depuração Focada) --- | |
| # By Carlex & Gemini & DreamO | |
| # ... (importações e inicializações inalteradas) ... | |
| import gradio as gr | |
| import torch | |
| import os | |
| import yaml | |
| from PIL import Image | |
| import shutil | |
| import gc | |
| import subprocess | |
| import math | |
| import google.generativeai as genai | |
| import numpy as np | |
| import imageio | |
| from pathlib import Path | |
| import huggingface_hub | |
| import json | |
| from inference import create_ltx_video_pipeline, load_image_to_tensor_with_resize_and_crop, seed_everething, calculate_padding | |
| from ltx_video.pipelines.pipeline_ltx_video import ConditioningItem | |
| from dreamo_helpers import dreamo_generator_singleton | |
| # ... (configurações e constantes inalteradas) ... | |
| config_file_path = "configs/ltxv-13b-0.9.8-distilled.yaml" | |
| with open(config_file_path, "r") as file: | |
| PIPELINE_CONFIG_YAML = yaml.safe_load(file) | |
| LTX_REPO = "Lightricks/LTX-Video" | |
| models_dir = "downloaded_models_gradio_cpu_init" | |
| Path(models_dir).mkdir(parents=True, exist_ok=True) | |
| WORKSPACE_DIR = "aduc_workspace" | |
| GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY") | |
| VIDEO_WIDTH = 720 | |
| VIDEO_HEIGHT = 720 | |
| VIDEO_FPS = 24 | |
| VIDEO_DURATION_SECONDS = 4 | |
| VIDEO_TOTAL_FRAMES = VIDEO_DURATION_SECONDS * VIDEO_FPS | |
| print("Baixando e criando pipelines LTX na CPU...") | |
| distilled_model_actual_path = huggingface_hub.hf_hub_download(repo_id=LTX_REPO, filename=PIPELINE_CONFIG_YAML["checkpoint_path"], local_dir=models_dir, local_dir_use_symlinks=False) | |
| pipeline_instance = create_ltx_video_pipeline(ckpt_path=distilled_model_actual_path, precision=PIPELINE_CONFIG_YAML["precision"], text_encoder_model_name_or_path=PIPELINE_CONFIG_YAML["text_encoder_model_name_or_path"], sampler=PIPELINE_CONFIG_YAML["sampler"], device='cpu') | |
| print("Modelos LTX prontos (na CPU).") | |
| # --- Ato 3: As Partituras dos Músicos (Funções) --- | |
| # ... (get_storyboard_from_director e run_keyframe_generation inalterados) ... | |
| def get_storyboard_from_director(num_fragments: int, prompt: str, initial_image_path: str, progress=gr.Progress()): | |
| progress(0.5, desc="[Diretor Gemini] Criando o storyboard...") | |
| if not initial_image_path: raise gr.Error("Por favor, forneça uma imagem de referência inicial.") | |
| if not GEMINI_API_KEY: raise gr.Error("Chave da API Gemini não configurada!") | |
| genai.configure(api_key=GEMINI_API_KEY) | |
| try: | |
| script_dir = os.path.dirname(os.path.abspath(__file__)) | |
| prompt_file_path = os.path.join(script_dir, "prompts", "director_storyboard_v2.txt") | |
| with open(prompt_file_path, "r", encoding="utf-8") as f: template = f.read() | |
| except FileNotFoundError: raise gr.Error(f"Arquivo de prompt não encontrado em '{prompt_file_path}'!") | |
| director_prompt = template.format(user_prompt=prompt, num_fragments=int(num_fragments)) | |
| model = genai.GenerativeModel('gemini-2.5-flash') | |
| img = Image.open(initial_image_path) | |
| response = model.generate_content([director_prompt, img]) | |
| try: | |
| cleaned_response = response.text.strip().replace("```json", "").replace("```", "") | |
| if not cleaned_response: raise ValueError("A resposta do Gemini estava vazia após a limpeza.") | |
| storyboard_data = json.loads(cleaned_response) | |
| return storyboard_data.get("storyboard", []) | |
| except (json.JSONDecodeError, ValueError) as e: | |
| raise gr.Error(f"O Diretor retornou uma resposta inválida. Erro: {e}. Resposta Bruta: '{response.text}'") | |
| def run_keyframe_generation(storyboard, ref_img_path_1, ref_img_path_2, ref_task_1, ref_task_2): | |
| if not storyboard: raise gr.Error("Nenhum roteiro para gerar imagens-chave.") | |
| if not ref_img_path_1: raise gr.Error("A Referência 1 é obrigatória.") | |
| with Image.open(ref_img_path_1) as img: | |
| width, height = img.size | |
| width = (width // 32) * 32 | |
| height = (height // 32) * 32 | |
| keyframe_paths, log_history = [], "" | |
| try: | |
| dreamo_generator_singleton.to_gpu() | |
| for i, prompt in enumerate(storyboard): | |
| log_message = f"Pintando Cena {i+1}/{len(storyboard)} com DreamO ({width}x{height})..." | |
| log_history += log_message + "\n" | |
| yield {keyframe_log_output: gr.update(value=log_history)} | |
| output_path = os.path.join(WORKSPACE_DIR, f"keyframe_image_{i+1}.png") | |
| image = dreamo_generator_singleton.generate_image_with_gpu_management( | |
| ref_image1_np=np.array(Image.open(ref_img_path_1).convert("RGB")) if ref_img_path_1 else None, | |
| ref_image2_np=np.array(Image.open(ref_img_path_2).convert("RGB")) if ref_img_path_2 else None, | |
| ref_task1=ref_task_1, ref_task2=ref_task_2, | |
| prompt=prompt, width=width, height=height | |
| ) | |
| image.save(output_path) | |
| keyframe_paths.append(output_path) | |
| log_message = f"Cena {i+1} pintada." | |
| log_history += log_message + "\n" | |
| yield {keyframe_log_output: gr.update(value=log_history), keyframe_gallery_output: gr.update(value=keyframe_paths), keyframe_images_state: keyframe_paths} | |
| finally: | |
| dreamo_generator_singleton.to_cpu() | |
| log_history += "\nPintura de todas as cenas concluída!" | |
| yield {keyframe_log_output: gr.update(value=log_history)} | |
| def run_ltx_animation(current_fragment_index, motion_prompt, conditioning_items_data, width, height, seed, cfg, progress=gr.Progress()): | |
| # ... (código inalterado) | |
| progress(0, desc=f"[Animador LTX] Gerando Cena {current_fragment_index}...") | |
| output_path = os.path.join(WORKSPACE_DIR, f"fragment_{current_fragment_index}.mp4") | |
| target_device = 'cuda' if torch.cuda.is_available() else 'cpu' | |
| try: | |
| pipeline_instance.to(target_device) | |
| conditioning_items = [] | |
| for (path, start_frame, strength) in conditioning_items_data: | |
| tensor = load_image_to_tensor_with_resize_and_crop(path, height, width) | |
| conditioning_items.append(ConditioningItem(tensor.to(target_device), start_frame, strength)) | |
| n_val = round((float(VIDEO_TOTAL_FRAMES) - 1.0) / 8.0) | |
| actual_num_frames = int(n_val * 8 + 1) | |
| padded_h, padded_w = ((height - 1) // 32 + 1) * 32, ((width - 1) // 32 + 1) * 32 | |
| padding_vals = calculate_padding(height, width, padded_h, padded_w) | |
| for cond_item in conditioning_items: cond_item.media_item = torch.nn.functional.pad(cond_item.media_item, padding_vals) | |
| timesteps = PIPELINE_CONFIG_YAML.get("first_pass", {}).get("timesteps") | |
| kwargs = {"prompt": motion_prompt, "negative_prompt": "blurry, distorted, bad quality, artifacts", "height": padded_h, "width": padded_w, "num_frames": actual_num_frames, "frame_rate": VIDEO_FPS, "generator": torch.Generator(device=target_device).manual_seed(int(seed) + current_fragment_index), "output_type": "pt", "guidance_scale": float(cfg), "timesteps": timesteps, "conditioning_items": conditioning_items, "vae_per_channel_normalize": True, "decode_timestep": PIPELINE_CONFIG_YAML["decode_timestep"], "decode_noise_scale": PIPELINE_CONFIG_YAML["decode_noise_scale"], "stochastic_sampling": PIPELINE_CONFIG_YAML["stochastic_sampling"], "image_cond_noise_scale": 0.15, "is_video": True, "mixed_precision": (PIPELINE_CONFIG_YAML["precision"] == "mixed_precision"), "offload_to_cpu": False, "enhance_prompt": False} | |
| result_tensor = pipeline_instance(**kwargs).images | |
| pad_l, pad_r, pad_t, pad_b = padding_vals; slice_h, slice_w = (-pad_b if pad_b > 0 else None), (-pad_r if pad_r > 0 else None) | |
| cropped_tensor = result_tensor[:, :, :VIDEO_TOTAL_FRAMES, pad_t:slice_h, pad_l:slice_w]; | |
| video_np = (cropped_tensor[0].permute(1, 2, 3, 0).cpu().float().numpy() * 255).astype(np.uint8) | |
| with imageio.get_writer(output_path, fps=VIDEO_FPS, codec='libx264', quality=8) as writer: | |
| for i, frame in enumerate(video_np): progress(i / len(video_np), desc=f"Renderizando frame {i+1}/{len(video_np)}..."); writer.append_data(frame) | |
| return output_path | |
| finally: | |
| pipeline_instance.to('cpu'); gc.collect() | |
| if torch.cuda.is_available(): torch.cuda.empty_cache() | |
| # <<<< FUNÇÃO DE PRODUÇÃO SIMPLIFICADA PARA DEPURAÇÃO >>>> | |
| def run_full_video_production(storyboard, keyframe_image_paths, seed, cfg): | |
| if not storyboard or not keyframe_image_paths: raise gr.Error("Roteiro e/ou imagens-chave estão faltando.") | |
| if len(storyboard) != len(keyframe_image_paths): raise gr.Error("A contagem de prompts do roteiro e imagens-chave não coincide.") | |
| with Image.open(keyframe_image_paths[0]) as img: | |
| width, height = img.size | |
| video_fragments, log_history = [], "" | |
| num_keyframes = len(keyframe_image_paths) | |
| n_val = round((float(VIDEO_TOTAL_FRAMES) - 1.0) / 8.0) | |
| actual_num_frames = int(n_val * 8 + 1) | |
| end_frame_index = actual_num_frames - 1 | |
| for i in range(num_keyframes - 1): | |
| # ... (lógica de interpolação inalterada) | |
| motion_prompt = storyboard[i] | |
| start_image_path = keyframe_image_paths[i] | |
| end_image_path = keyframe_image_paths[i+1] | |
| log_message = f"Preparando Cena de Interpolação {i+1}/{num_keyframes}..." | |
| log_history += log_message + "\n" | |
| yield {video_production_log_output: gr.update(value=log_history), fragment_list_state: video_fragments} | |
| conditioning_items_data = [(start_image_path, 0, 1.0), (end_image_path, end_frame_index, 1.0)] | |
| log_message = f" -> De: {os.path.basename(start_image_path)} | Para: {os.path.basename(end_image_path)}" | |
| log_history += log_message + "\n" | |
| yield {video_production_log_output: gr.update(value=log_history), fragment_list_state: video_fragments} | |
| fragment_path = run_ltx_animation(i + 1, motion_prompt, conditioning_items_data, width, height, seed, cfg) | |
| video_fragments.append(fragment_path) | |
| log_message = f"Cena {i+1} concluída." | |
| log_history += log_message + "\n" | |
| yield {video_production_log_output: gr.update(value=log_history), fragment_list_state: video_fragments} | |
| if num_keyframes > 0: | |
| # ... (lógica da cena final inalterada) | |
| last_scene_index = num_keyframes - 1 | |
| last_motion_prompt = storyboard[last_scene_index] | |
| last_image_path = keyframe_image_paths[last_scene_index] | |
| log_message = f"Preparando Cena Final (Animação Livre) {num_keyframes}/{num_keyframes}..." | |
| log_history += log_message + "\n" | |
| yield {video_production_log_output: gr.update(value=log_history), fragment_list_state: video_fragments} | |
| conditioning_items_data = [(last_image_path, 0, 1.0)] | |
| log_message = f" -> Ponto de Partida: {os.path.basename(last_image_path)}" | |
| log_history += log_message + "\n" | |
| yield {video_production_log_output: gr.update(value=log_history), fragment_list_state: video_fragments} | |
| fragment_path = run_ltx_animation(last_scene_index + 1, last_motion_prompt, conditioning_items_data, width, height, seed, cfg) | |
| video_fragments.append(fragment_path) | |
| log_message = f"Cena Final concluída." | |
| log_history += log_message + "\n" | |
| yield {video_production_log_output: gr.update(value=log_history), fragment_list_state: video_fragments} | |
| log_history += "\nProdução de todas as cenas de vídeo concluída!" | |
| yield {video_production_log_output: gr.update(value=log_history), fragment_list_state: video_fragments} | |
| def concatenate_masterpiece(fragment_paths: list, progress=gr.Progress()): | |
| # ... (código inalterado) | |
| progress(0.5, desc="Montando a obra-prima final..."); list_file_path, final_output_path = os.path.join(WORKSPACE_DIR, "concat_list.txt"), os.path.join(WORKSPACE_DIR, "obra_prima_final.mp4") | |
| with open(list_file_path, "w") as f: | |
| for path in fragment_paths: f.write(f"file '{os.path.abspath(path)}'\n") | |
| command = f"ffmpeg -y -f concat -safe 0 -i {list_file_path} -c copy {final_output_path}" | |
| try: subprocess.run(command, shell=True, check=True, capture_output=True, text=True); return final_output_path | |
| except subprocess.CalledProcessError as e: raise gr.Error(f"FFmpeg falhou ao unir os vídeos: {e.stderr}") | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| # ... (UI inalterada) | |
| gr.Markdown("# LTX Video - Storyboard em Vídeo (ADUC-SDR)\n*By Carlex & Gemini & DreamO*") | |
| storyboard_state = gr.State([]) | |
| keyframe_images_state = gr.State([]) | |
| fragment_list_state = gr.State([]) | |
| if os.path.exists(WORKSPACE_DIR): shutil.rmtree(WORKSPACE_DIR) | |
| os.makedirs(WORKSPACE_DIR) | |
| with gr.Tabs(): | |
| with gr.TabItem("ETAPA 1: O DIRETOR (Roteiro Visual)"): | |
| # ... (UI inalterada) | |
| with gr.Row(): | |
| with gr.Column(): | |
| num_fragments_input = gr.Slider(2, 10, 4, step=1, label="Número de Cenas") | |
| prompt_input = gr.Textbox(label="Ideia Geral (Prompt)") | |
| image_input = gr.Image(type="filepath", label="Imagem de Referência Principal") | |
| director_button = gr.Button("▶️ 1. Gerar Roteiro Visual", variant="primary") | |
| with gr.Column(): | |
| storyboard_to_show = gr.JSON(label="Roteiro Gerado (para visualização)") | |
| with gr.TabItem("ETAPA 2: O PINTOR (Imagens-Chave)"): | |
| # ... (UI inalterada) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| gr.Markdown("### Controles do Pintor (DreamO)") | |
| with gr.Row(): | |
| ref_image_1_input = gr.Image(label="Referência 1 (Principal)", type="filepath") | |
| ref_image_2_input = gr.Image(label="Referência 2 (Opcional, para composição)", type="filepath") | |
| with gr.Row(): | |
| ref_task_1_input = gr.Dropdown(choices=["ip", "id", "style"], value="ip", label="Tarefa para Referência 1") | |
| ref_task_2_input = gr.Dropdown(choices=["ip", "id", "style"], value="ip", label="Tarefa para Referência 2") | |
| photographer_button = gr.Button("▶️ 2. Pintar Imagens-Chave", variant="primary") | |
| keyframe_log_output = gr.Textbox(label="Diário de Bordo do Pintor", lines=5, interactive=False) | |
| with gr.Column(scale=1): | |
| keyframe_gallery_output = gr.Gallery(label="Imagens-Chave Pintadas", object_fit="contain", height="auto", type="filepath") | |
| with gr.TabItem("ETAPA 3: A PRODUÇÃO (Gerar Cenas em Vídeo)"): | |
| # ... (UI inalterada) | |
| gr.Markdown(f"Gere o vídeo interpolando entre as imagens-chave. A resolução será a mesma da sua imagem de referência. Cada clipe terá **{VIDEO_DURATION_SECONDS} segundos a {VIDEO_FPS} FPS**.") | |
| with gr.Row(): | |
| with gr.Column(): | |
| keyframes_to_render = gr.Gallery(label="Imagens-Chave para Animar", object_fit="contain", height="auto", interactive=False) | |
| animator_button = gr.Button("▶️ 3. Produzir Cenas em Vídeo", variant="primary", interactive=False) | |
| video_production_log_output = gr.Textbox(label="Diário de Bordo da Produção", lines=10, interactive=False) | |
| with gr.Column(): | |
| # <<<< REMOVIDO PARA DEPURAÇÃO >>>> | |
| # fragment_gallery_output = gr.Gallery(label="Cenas Produzidas (Vídeos)", object_fit="contain", height="auto") | |
| gr.Markdown("A galeria de vídeos foi desativada para depuração. Verifique o resultado na Etapa 4.") | |
| with gr.Row(): | |
| seed_number = gr.Number(42, label="Seed") | |
| cfg_slider = gr.Slider(1.0, 10.0, 2.5, step=0.1, label="CFG") | |
| with gr.TabItem("ETAPA 4: PÓS-PRODUÇÃO"): | |
| # ... (UI inalterada) | |
| with gr.Row(): | |
| with gr.Column(): | |
| editor_button = gr.Button("▶️ 4. Concatenar Vídeo Final", variant="primary") | |
| final_fragments_display = gr.JSON(label="Fragmentos a Concatenar") | |
| with gr.Column(): | |
| final_video_output = gr.Video(label="A Obra-Prima Final") | |
| # --- Ato 5: A Regência (Lógica de Conexão dos Botões) --- | |
| def director_success(storyboard_list, img_path): | |
| # ... (lógica inalterada) | |
| if not storyboard_list: raise gr.Error("O storyboard está vazio ou em formato inválido.") | |
| return {storyboard_state: storyboard_list, storyboard_to_show: gr.update(value=storyboard_list), ref_image_1_input: gr.update(value=img_path)} | |
| director_button.click( | |
| fn=get_storyboard_from_director, | |
| inputs=[num_fragments_input, prompt_input, image_input], | |
| outputs=[storyboard_state] | |
| ).then( | |
| fn=director_success, | |
| inputs=[storyboard_state, image_input], | |
| outputs=[storyboard_state, storyboard_to_show, ref_image_1_input] | |
| ) | |
| photographer_button.click( | |
| fn=run_keyframe_generation, | |
| inputs=[storyboard_state, ref_image_1_input, ref_image_2_input, ref_task_1_input, ref_task_2_input], | |
| outputs=[keyframe_log_output, keyframe_gallery_output, keyframe_images_state] | |
| ).then( | |
| lambda paths: {keyframes_to_render: gr.update(value=paths), animator_button: gr.update(interactive=True)}, | |
| inputs=[keyframe_images_state], | |
| outputs=[keyframes_to_render, animator_button] | |
| ) | |
| # <<<< CHAMADA DE CLICK SIMPLIFICADA PARA DEPURAÇÃO >>>> | |
| animator_button.click( | |
| fn=run_full_video_production, | |
| inputs=[storyboard_state, keyframe_images_state, seed_number, cfg_slider], | |
| outputs=[video_production_log_output, fragment_list_state] | |
| ).then( | |
| lambda paths: gr.update(value=paths), | |
| inputs=[fragment_list_state], | |
| outputs=[final_fragments_display] | |
| ) | |
| editor_button.click( | |
| fn=concatenate_masterpiece, | |
| inputs=[fragment_list_state], | |
| outputs=[final_video_output] | |
| ) | |
| if __name__ == "__main__": | |
| demo.queue().launch(server_name="0.0.0.0", share=True) |