Aduc-Sdr_Novim / app-v1.py
Carlexxx
Remove example_inputs directory
3af0ef1
raw
history blame
18 kB
# --- app.py (O Painel de Controle do Maestro - Depuração Focada) ---
# By Carlex & Gemini & DreamO
# ... (importações e inicializações inalteradas) ...
import gradio as gr
import torch
import os
import yaml
from PIL import Image
import shutil
import gc
import subprocess
import math
import google.generativeai as genai
import numpy as np
import imageio
from pathlib import Path
import huggingface_hub
import json
from inference import create_ltx_video_pipeline, load_image_to_tensor_with_resize_and_crop, seed_everething, calculate_padding
from ltx_video.pipelines.pipeline_ltx_video import ConditioningItem
from dreamo_helpers import dreamo_generator_singleton
# ... (configurações e constantes inalteradas) ...
config_file_path = "configs/ltxv-13b-0.9.8-distilled.yaml"
with open(config_file_path, "r") as file:
PIPELINE_CONFIG_YAML = yaml.safe_load(file)
LTX_REPO = "Lightricks/LTX-Video"
models_dir = "downloaded_models_gradio_cpu_init"
Path(models_dir).mkdir(parents=True, exist_ok=True)
WORKSPACE_DIR = "aduc_workspace"
GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY")
VIDEO_WIDTH = 720
VIDEO_HEIGHT = 720
VIDEO_FPS = 24
VIDEO_DURATION_SECONDS = 4
VIDEO_TOTAL_FRAMES = VIDEO_DURATION_SECONDS * VIDEO_FPS
print("Baixando e criando pipelines LTX na CPU...")
distilled_model_actual_path = huggingface_hub.hf_hub_download(repo_id=LTX_REPO, filename=PIPELINE_CONFIG_YAML["checkpoint_path"], local_dir=models_dir, local_dir_use_symlinks=False)
pipeline_instance = create_ltx_video_pipeline(ckpt_path=distilled_model_actual_path, precision=PIPELINE_CONFIG_YAML["precision"], text_encoder_model_name_or_path=PIPELINE_CONFIG_YAML["text_encoder_model_name_or_path"], sampler=PIPELINE_CONFIG_YAML["sampler"], device='cpu')
print("Modelos LTX prontos (na CPU).")
# --- Ato 3: As Partituras dos Músicos (Funções) ---
# ... (get_storyboard_from_director e run_keyframe_generation inalterados) ...
def get_storyboard_from_director(num_fragments: int, prompt: str, initial_image_path: str, progress=gr.Progress()):
progress(0.5, desc="[Diretor Gemini] Criando o storyboard...")
if not initial_image_path: raise gr.Error("Por favor, forneça uma imagem de referência inicial.")
if not GEMINI_API_KEY: raise gr.Error("Chave da API Gemini não configurada!")
genai.configure(api_key=GEMINI_API_KEY)
try:
script_dir = os.path.dirname(os.path.abspath(__file__))
prompt_file_path = os.path.join(script_dir, "prompts", "director_storyboard_v2.txt")
with open(prompt_file_path, "r", encoding="utf-8") as f: template = f.read()
except FileNotFoundError: raise gr.Error(f"Arquivo de prompt não encontrado em '{prompt_file_path}'!")
director_prompt = template.format(user_prompt=prompt, num_fragments=int(num_fragments))
model = genai.GenerativeModel('gemini-2.5-flash')
img = Image.open(initial_image_path)
response = model.generate_content([director_prompt, img])
try:
cleaned_response = response.text.strip().replace("```json", "").replace("```", "")
if not cleaned_response: raise ValueError("A resposta do Gemini estava vazia após a limpeza.")
storyboard_data = json.loads(cleaned_response)
return storyboard_data.get("storyboard", [])
except (json.JSONDecodeError, ValueError) as e:
raise gr.Error(f"O Diretor retornou uma resposta inválida. Erro: {e}. Resposta Bruta: '{response.text}'")
def run_keyframe_generation(storyboard, ref_img_path_1, ref_img_path_2, ref_task_1, ref_task_2):
if not storyboard: raise gr.Error("Nenhum roteiro para gerar imagens-chave.")
if not ref_img_path_1: raise gr.Error("A Referência 1 é obrigatória.")
with Image.open(ref_img_path_1) as img:
width, height = img.size
width = (width // 32) * 32
height = (height // 32) * 32
keyframe_paths, log_history = [], ""
try:
dreamo_generator_singleton.to_gpu()
for i, prompt in enumerate(storyboard):
log_message = f"Pintando Cena {i+1}/{len(storyboard)} com DreamO ({width}x{height})..."
log_history += log_message + "\n"
yield {keyframe_log_output: gr.update(value=log_history)}
output_path = os.path.join(WORKSPACE_DIR, f"keyframe_image_{i+1}.png")
image = dreamo_generator_singleton.generate_image_with_gpu_management(
ref_image1_np=np.array(Image.open(ref_img_path_1).convert("RGB")) if ref_img_path_1 else None,
ref_image2_np=np.array(Image.open(ref_img_path_2).convert("RGB")) if ref_img_path_2 else None,
ref_task1=ref_task_1, ref_task2=ref_task_2,
prompt=prompt, width=width, height=height
)
image.save(output_path)
keyframe_paths.append(output_path)
log_message = f"Cena {i+1} pintada."
log_history += log_message + "\n"
yield {keyframe_log_output: gr.update(value=log_history), keyframe_gallery_output: gr.update(value=keyframe_paths), keyframe_images_state: keyframe_paths}
finally:
dreamo_generator_singleton.to_cpu()
log_history += "\nPintura de todas as cenas concluída!"
yield {keyframe_log_output: gr.update(value=log_history)}
def run_ltx_animation(current_fragment_index, motion_prompt, conditioning_items_data, width, height, seed, cfg, progress=gr.Progress()):
# ... (código inalterado)
progress(0, desc=f"[Animador LTX] Gerando Cena {current_fragment_index}...")
output_path = os.path.join(WORKSPACE_DIR, f"fragment_{current_fragment_index}.mp4")
target_device = 'cuda' if torch.cuda.is_available() else 'cpu'
try:
pipeline_instance.to(target_device)
conditioning_items = []
for (path, start_frame, strength) in conditioning_items_data:
tensor = load_image_to_tensor_with_resize_and_crop(path, height, width)
conditioning_items.append(ConditioningItem(tensor.to(target_device), start_frame, strength))
n_val = round((float(VIDEO_TOTAL_FRAMES) - 1.0) / 8.0)
actual_num_frames = int(n_val * 8 + 1)
padded_h, padded_w = ((height - 1) // 32 + 1) * 32, ((width - 1) // 32 + 1) * 32
padding_vals = calculate_padding(height, width, padded_h, padded_w)
for cond_item in conditioning_items: cond_item.media_item = torch.nn.functional.pad(cond_item.media_item, padding_vals)
timesteps = PIPELINE_CONFIG_YAML.get("first_pass", {}).get("timesteps")
kwargs = {"prompt": motion_prompt, "negative_prompt": "blurry, distorted, bad quality, artifacts", "height": padded_h, "width": padded_w, "num_frames": actual_num_frames, "frame_rate": VIDEO_FPS, "generator": torch.Generator(device=target_device).manual_seed(int(seed) + current_fragment_index), "output_type": "pt", "guidance_scale": float(cfg), "timesteps": timesteps, "conditioning_items": conditioning_items, "vae_per_channel_normalize": True, "decode_timestep": PIPELINE_CONFIG_YAML["decode_timestep"], "decode_noise_scale": PIPELINE_CONFIG_YAML["decode_noise_scale"], "stochastic_sampling": PIPELINE_CONFIG_YAML["stochastic_sampling"], "image_cond_noise_scale": 0.15, "is_video": True, "mixed_precision": (PIPELINE_CONFIG_YAML["precision"] == "mixed_precision"), "offload_to_cpu": False, "enhance_prompt": False}
result_tensor = pipeline_instance(**kwargs).images
pad_l, pad_r, pad_t, pad_b = padding_vals; slice_h, slice_w = (-pad_b if pad_b > 0 else None), (-pad_r if pad_r > 0 else None)
cropped_tensor = result_tensor[:, :, :VIDEO_TOTAL_FRAMES, pad_t:slice_h, pad_l:slice_w];
video_np = (cropped_tensor[0].permute(1, 2, 3, 0).cpu().float().numpy() * 255).astype(np.uint8)
with imageio.get_writer(output_path, fps=VIDEO_FPS, codec='libx264', quality=8) as writer:
for i, frame in enumerate(video_np): progress(i / len(video_np), desc=f"Renderizando frame {i+1}/{len(video_np)}..."); writer.append_data(frame)
return output_path
finally:
pipeline_instance.to('cpu'); gc.collect()
if torch.cuda.is_available(): torch.cuda.empty_cache()
# <<<< FUNÇÃO DE PRODUÇÃO SIMPLIFICADA PARA DEPURAÇÃO >>>>
def run_full_video_production(storyboard, keyframe_image_paths, seed, cfg):
if not storyboard or not keyframe_image_paths: raise gr.Error("Roteiro e/ou imagens-chave estão faltando.")
if len(storyboard) != len(keyframe_image_paths): raise gr.Error("A contagem de prompts do roteiro e imagens-chave não coincide.")
with Image.open(keyframe_image_paths[0]) as img:
width, height = img.size
video_fragments, log_history = [], ""
num_keyframes = len(keyframe_image_paths)
n_val = round((float(VIDEO_TOTAL_FRAMES) - 1.0) / 8.0)
actual_num_frames = int(n_val * 8 + 1)
end_frame_index = actual_num_frames - 1
for i in range(num_keyframes - 1):
# ... (lógica de interpolação inalterada)
motion_prompt = storyboard[i]
start_image_path = keyframe_image_paths[i]
end_image_path = keyframe_image_paths[i+1]
log_message = f"Preparando Cena de Interpolação {i+1}/{num_keyframes}..."
log_history += log_message + "\n"
yield {video_production_log_output: gr.update(value=log_history), fragment_list_state: video_fragments}
conditioning_items_data = [(start_image_path, 0, 1.0), (end_image_path, end_frame_index, 1.0)]
log_message = f" -> De: {os.path.basename(start_image_path)} | Para: {os.path.basename(end_image_path)}"
log_history += log_message + "\n"
yield {video_production_log_output: gr.update(value=log_history), fragment_list_state: video_fragments}
fragment_path = run_ltx_animation(i + 1, motion_prompt, conditioning_items_data, width, height, seed, cfg)
video_fragments.append(fragment_path)
log_message = f"Cena {i+1} concluída."
log_history += log_message + "\n"
yield {video_production_log_output: gr.update(value=log_history), fragment_list_state: video_fragments}
if num_keyframes > 0:
# ... (lógica da cena final inalterada)
last_scene_index = num_keyframes - 1
last_motion_prompt = storyboard[last_scene_index]
last_image_path = keyframe_image_paths[last_scene_index]
log_message = f"Preparando Cena Final (Animação Livre) {num_keyframes}/{num_keyframes}..."
log_history += log_message + "\n"
yield {video_production_log_output: gr.update(value=log_history), fragment_list_state: video_fragments}
conditioning_items_data = [(last_image_path, 0, 1.0)]
log_message = f" -> Ponto de Partida: {os.path.basename(last_image_path)}"
log_history += log_message + "\n"
yield {video_production_log_output: gr.update(value=log_history), fragment_list_state: video_fragments}
fragment_path = run_ltx_animation(last_scene_index + 1, last_motion_prompt, conditioning_items_data, width, height, seed, cfg)
video_fragments.append(fragment_path)
log_message = f"Cena Final concluída."
log_history += log_message + "\n"
yield {video_production_log_output: gr.update(value=log_history), fragment_list_state: video_fragments}
log_history += "\nProdução de todas as cenas de vídeo concluída!"
yield {video_production_log_output: gr.update(value=log_history), fragment_list_state: video_fragments}
def concatenate_masterpiece(fragment_paths: list, progress=gr.Progress()):
# ... (código inalterado)
progress(0.5, desc="Montando a obra-prima final..."); list_file_path, final_output_path = os.path.join(WORKSPACE_DIR, "concat_list.txt"), os.path.join(WORKSPACE_DIR, "obra_prima_final.mp4")
with open(list_file_path, "w") as f:
for path in fragment_paths: f.write(f"file '{os.path.abspath(path)}'\n")
command = f"ffmpeg -y -f concat -safe 0 -i {list_file_path} -c copy {final_output_path}"
try: subprocess.run(command, shell=True, check=True, capture_output=True, text=True); return final_output_path
except subprocess.CalledProcessError as e: raise gr.Error(f"FFmpeg falhou ao unir os vídeos: {e.stderr}")
with gr.Blocks(theme=gr.themes.Soft()) as demo:
# ... (UI inalterada)
gr.Markdown("# LTX Video - Storyboard em Vídeo (ADUC-SDR)\n*By Carlex & Gemini & DreamO*")
storyboard_state = gr.State([])
keyframe_images_state = gr.State([])
fragment_list_state = gr.State([])
if os.path.exists(WORKSPACE_DIR): shutil.rmtree(WORKSPACE_DIR)
os.makedirs(WORKSPACE_DIR)
with gr.Tabs():
with gr.TabItem("ETAPA 1: O DIRETOR (Roteiro Visual)"):
# ... (UI inalterada)
with gr.Row():
with gr.Column():
num_fragments_input = gr.Slider(2, 10, 4, step=1, label="Número de Cenas")
prompt_input = gr.Textbox(label="Ideia Geral (Prompt)")
image_input = gr.Image(type="filepath", label="Imagem de Referência Principal")
director_button = gr.Button("▶️ 1. Gerar Roteiro Visual", variant="primary")
with gr.Column():
storyboard_to_show = gr.JSON(label="Roteiro Gerado (para visualização)")
with gr.TabItem("ETAPA 2: O PINTOR (Imagens-Chave)"):
# ... (UI inalterada)
with gr.Row():
with gr.Column(scale=2):
gr.Markdown("### Controles do Pintor (DreamO)")
with gr.Row():
ref_image_1_input = gr.Image(label="Referência 1 (Principal)", type="filepath")
ref_image_2_input = gr.Image(label="Referência 2 (Opcional, para composição)", type="filepath")
with gr.Row():
ref_task_1_input = gr.Dropdown(choices=["ip", "id", "style"], value="ip", label="Tarefa para Referência 1")
ref_task_2_input = gr.Dropdown(choices=["ip", "id", "style"], value="ip", label="Tarefa para Referência 2")
photographer_button = gr.Button("▶️ 2. Pintar Imagens-Chave", variant="primary")
keyframe_log_output = gr.Textbox(label="Diário de Bordo do Pintor", lines=5, interactive=False)
with gr.Column(scale=1):
keyframe_gallery_output = gr.Gallery(label="Imagens-Chave Pintadas", object_fit="contain", height="auto", type="filepath")
with gr.TabItem("ETAPA 3: A PRODUÇÃO (Gerar Cenas em Vídeo)"):
# ... (UI inalterada)
gr.Markdown(f"Gere o vídeo interpolando entre as imagens-chave. A resolução será a mesma da sua imagem de referência. Cada clipe terá **{VIDEO_DURATION_SECONDS} segundos a {VIDEO_FPS} FPS**.")
with gr.Row():
with gr.Column():
keyframes_to_render = gr.Gallery(label="Imagens-Chave para Animar", object_fit="contain", height="auto", interactive=False)
animator_button = gr.Button("▶️ 3. Produzir Cenas em Vídeo", variant="primary", interactive=False)
video_production_log_output = gr.Textbox(label="Diário de Bordo da Produção", lines=10, interactive=False)
with gr.Column():
# <<<< REMOVIDO PARA DEPURAÇÃO >>>>
# fragment_gallery_output = gr.Gallery(label="Cenas Produzidas (Vídeos)", object_fit="contain", height="auto")
gr.Markdown("A galeria de vídeos foi desativada para depuração. Verifique o resultado na Etapa 4.")
with gr.Row():
seed_number = gr.Number(42, label="Seed")
cfg_slider = gr.Slider(1.0, 10.0, 2.5, step=0.1, label="CFG")
with gr.TabItem("ETAPA 4: PÓS-PRODUÇÃO"):
# ... (UI inalterada)
with gr.Row():
with gr.Column():
editor_button = gr.Button("▶️ 4. Concatenar Vídeo Final", variant="primary")
final_fragments_display = gr.JSON(label="Fragmentos a Concatenar")
with gr.Column():
final_video_output = gr.Video(label="A Obra-Prima Final")
# --- Ato 5: A Regência (Lógica de Conexão dos Botões) ---
def director_success(storyboard_list, img_path):
# ... (lógica inalterada)
if not storyboard_list: raise gr.Error("O storyboard está vazio ou em formato inválido.")
return {storyboard_state: storyboard_list, storyboard_to_show: gr.update(value=storyboard_list), ref_image_1_input: gr.update(value=img_path)}
director_button.click(
fn=get_storyboard_from_director,
inputs=[num_fragments_input, prompt_input, image_input],
outputs=[storyboard_state]
).then(
fn=director_success,
inputs=[storyboard_state, image_input],
outputs=[storyboard_state, storyboard_to_show, ref_image_1_input]
)
photographer_button.click(
fn=run_keyframe_generation,
inputs=[storyboard_state, ref_image_1_input, ref_image_2_input, ref_task_1_input, ref_task_2_input],
outputs=[keyframe_log_output, keyframe_gallery_output, keyframe_images_state]
).then(
lambda paths: {keyframes_to_render: gr.update(value=paths), animator_button: gr.update(interactive=True)},
inputs=[keyframe_images_state],
outputs=[keyframes_to_render, animator_button]
)
# <<<< CHAMADA DE CLICK SIMPLIFICADA PARA DEPURAÇÃO >>>>
animator_button.click(
fn=run_full_video_production,
inputs=[storyboard_state, keyframe_images_state, seed_number, cfg_slider],
outputs=[video_production_log_output, fragment_list_state]
).then(
lambda paths: gr.update(value=paths),
inputs=[fragment_list_state],
outputs=[final_fragments_display]
)
editor_button.click(
fn=concatenate_masterpiece,
inputs=[fragment_list_state],
outputs=[final_video_output]
)
if __name__ == "__main__":
demo.queue().launch(server_name="0.0.0.0", share=True)