Spaces:
Running
Running
| # app.py | |
| import os | |
| import oss2 | |
| import sys | |
| import uuid | |
| import shutil | |
| import time | |
| import gradio as gr | |
| import requests | |
| os.system("pip install dashscope") | |
| import dashscope | |
| from dashscope.utils.oss_utils import check_and_upload_local | |
| DASHSCOPE_API_KEY = os.getenv("DASHSCOPE_API_KEY") | |
| dashscope.api_key = DASHSCOPE_API_KEY | |
| class WanS2VApp: | |
| def __init__(self): | |
| pass | |
| def predict( | |
| self, | |
| ref_img, | |
| audio, | |
| resolution="480P", | |
| style="speech", | |
| ): | |
| # Upload files to OSS if needed and get URLs | |
| _, image_url = check_and_upload_local("wan2.2-s2v", ref_img, DASHSCOPE_API_KEY) | |
| _, audio_url = check_and_upload_local("wan2.2-s2v", audio, DASHSCOPE_API_KEY) | |
| # Prepare the request payload | |
| payload = { | |
| "model": "wan2.2-s2v", | |
| "input": { | |
| "image_url": image_url, | |
| "audio_url": audio_url | |
| }, | |
| "parameters": { | |
| "style": style, | |
| "resolution": resolution, | |
| } | |
| } | |
| # Set up headers | |
| headers = { | |
| "X-DashScope-Async": "enable", | |
| "X-DashScope-OssResourceResolve": "enable", | |
| "Authorization": f"Bearer {DASHSCOPE_API_KEY}", | |
| "Content-Type": "application/json" | |
| } | |
| # Make the initial API request | |
| url = "https://dashscope.aliyuncs.com/api/v1/services/aigc/image2video/video-synthesis/" | |
| response = requests.post(url, json=payload, headers=headers) | |
| # Check if request was successful | |
| if response.status_code != 200: | |
| raise Exception(f"Initial request failed with status code {response.status_code}: {response.text}") | |
| # Get the task ID from response | |
| result = response.json() | |
| task_id = result.get("output", {}).get("task_id") | |
| if not task_id: | |
| raise Exception("Failed to get task ID from response") | |
| # Poll for results | |
| get_url = f"https://dashscope.aliyuncs.com/api/v1/tasks/{task_id}" | |
| headers = { | |
| "Authorization": f"Bearer {DASHSCOPE_API_KEY}", | |
| "Content-Type": "application/json" | |
| } | |
| while True: | |
| response = requests.get(get_url, headers=headers) | |
| if response.status_code != 200: | |
| raise Exception(f"Failed to get task status: {response.status_code}: {response.text}") | |
| result = response.json() | |
| print(result) | |
| task_status = result.get("output", {}).get("task_status") | |
| if task_status == "SUCCEEDED": | |
| # Task completed successfully, return video URL | |
| video_url = result["output"]["results"]["video_url"] | |
| return video_url | |
| elif task_status == "FAILED": | |
| # Task failed, raise an exception with error message | |
| error_msg = result.get("output", {}).get("message", "Unknown error") | |
| raise Exception(f"Task failed: {error_msg}") | |
| else: | |
| # Task is still running, wait and retry | |
| time.sleep(5) # Wait 5 seconds before polling again | |
| def start_app(): | |
| import argparse | |
| parser = argparse.ArgumentParser(description="Wan2.2-S2V ่ง้ข็ๆๅทฅๅ ท") | |
| args = parser.parse_args() | |
| app = WanS2VApp() | |
| with gr.Blocks(title="Wan2.2-S2V ่ง้ข็ๆ") as demo: | |
| # gr.Markdown("# Wan2.2-S2V ่ง้ข็ๆๅทฅๅ ท") | |
| gr.HTML(""" | |
| <div style="text-align: center; font-size: 32px; font-weight: bold; margin-bottom: 20px;"> | |
| Wan2.2-S2V | |
| </div> | |
| """) | |
| gr.Markdown("ๅบไบ้ณ้ขๅๅ่ๅพๅ็ๆ่ง้ข") | |
| with gr.Row(): | |
| with gr.Column(): | |
| ref_img = gr.Image( | |
| label="Input image(่พๅ ฅๅพๅ)", | |
| type="filepath", | |
| sources=["upload"], | |
| ) | |
| audio = gr.Audio( | |
| label="Audio(้ณ้ขๆไปถ)", | |
| type="filepath", | |
| sources=["upload"], | |
| ) | |
| resolution = gr.Dropdown( | |
| label="Resolution(ๅ่พจ็)", | |
| choices=["480P", "720P"], | |
| value="480P", | |
| info="Inference Resolution, default: 480P(ๆจ็ๅ่พจ็๏ผ้ป่ฎค480P)" | |
| ) | |
| run_button = gr.Button("Generate Video(็ๆ่ง้ข)") | |
| with gr.Column(): | |
| output_video = gr.Video(label="Output Video(่พๅบ่ง้ข)") | |
| run_button.click( | |
| fn=app.predict, | |
| inputs=[ | |
| ref_img, | |
| audio, | |
| resolution, | |
| ], | |
| outputs=[output_video], | |
| ) | |
| examples_dir = "examples" | |
| if os.path.exists(examples_dir): | |
| example_data = [] | |
| files_dict = {} | |
| for file in os.listdir(examples_dir): | |
| file_path = os.path.join(examples_dir, file) | |
| name, ext = os.path.splitext(file) | |
| if ext.lower() in [".png", ".jpg", ".jpeg", ".bmp", ".tiff", ".webp"]: | |
| if name not in files_dict: | |
| files_dict[name] = {} | |
| files_dict[name]["image"] = file_path | |
| elif ext.lower() in [".mp3", ".wav"]: | |
| if name not in files_dict: | |
| files_dict[name] = {} | |
| files_dict[name]["audio"] = file_path | |
| for name, files in files_dict.items(): | |
| if "image" in files and "audio" in files: | |
| example_data.append([ | |
| files["image"], | |
| files["audio"], | |
| "480P" | |
| ]) | |
| if example_data: | |
| gr.Examples( | |
| examples=example_data, | |
| inputs=[ref_img, audio, resolution], | |
| outputs=output_video, | |
| fn=app.predict, | |
| cache_examples=False, | |
| ) | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860 | |
| ) | |
| if __name__ == "__main__": | |
| start_app() |