Spaces:
Running
Running
File size: 19,209 Bytes
6f65703 8721ec2 6f65703 8721ec2 6f65703 b9247d3 6f65703 b9247d3 6f65703 b9247d3 6f65703 b9247d3 6f65703 b9247d3 6f65703 b9247d3 6f65703 b9247d3 6f65703 b9247d3 6f65703 b9247d3 6f65703 b9247d3 6f65703 b9247d3 1a8b29b b9247d3 6f65703 b9247d3 6f65703 b9247d3 6f65703 b9247d3 6f65703 46c5397 6f65703 46c5397 6f65703 46c5397 6f65703 b9247d3 6f65703 46c5397 6f65703 46c5397 6f65703 46c5397 6f65703 8721ec2 6f65703 b9247d3 8721ec2 b9247d3 6f65703 8721ec2 6f65703 b9247d3 6f65703 b9247d3 8721ec2 b9247d3 8721ec2 b9247d3 8721ec2 b9247d3 8721ec2 6f65703 b9247d3 6f65703 b9247d3 6f65703 b9247d3 6f65703 b9247d3 6f65703 46c5397 6f65703 8721ec2 b9247d3 8721ec2 b9247d3 6f65703 |
|
import os
import sys
import cv2
import json
import random
import time
import datetime
import requests
import func_timeout
import numpy as np
import gradio as gr
import boto3
import tempfile
from botocore.client import Config
from PIL import Image
# TOKEN = os.environ['TOKEN']
# APIKEY = os.environ['APIKEY']
# UKAPIURL = os.environ['UKAPIURL']
OneKey = os.environ['OneKey'].strip()
OneKey = OneKey.split("#")
TOKEN = OneKey[0]
APIKEY = OneKey[1]
UKAPIURL = OneKey[2]
LLMKEY = OneKey[3]
R2_ACCESS_KEY = OneKey[4]
R2_SECRET_KEY = OneKey[5]
R2_ENDPOINT = OneKey[6]
tmpFolder = "tmp"
os.makedirs(tmpFolder, exist_ok=True)
def upload_user_img(clientIp, timeId, img):
fileName = clientIp.replace(".", "")+str(timeId)+".jpg"
local_path = os.path.join(tmpFolder, fileName)
img = cv2.imread(img)
cv2.imwrite(os.path.join(tmpFolder, fileName), img)
json_data = {
"token": TOKEN,
"input1": fileName,
"input2": "",
"protocol": "",
"cloud": "ali"
}
session = requests.session()
ret = requests.post(
f"{UKAPIURL}/upload",
headers={'Content-Type': 'application/json'},
json=json_data
)
res = ""
if ret.status_code==200:
if 'upload1' in ret.json():
upload_url = ret.json()['upload1']
headers = {'Content-Type': 'image/jpeg'}
response = session.put(upload_url, data=open(local_path, 'rb').read(), headers=headers)
# print(response.status_code)
if response.status_code == 200:
res = upload_url
if os.path.exists(local_path):
os.remove(local_path)
return res
class R2Api:
def __init__(self, session=None):
super().__init__()
self.R2_BUCKET = "trump-ai-voice"
self.domain = "https://www.trumpaivoice.net/"
self.R2_ACCESS_KEY = R2_ACCESS_KEY
self.R2_SECRET_KEY = R2_SECRET_KEY
self.R2_ENDPOINT = R2_ENDPOINT
self.client = boto3.client(
"s3",
endpoint_url=self.R2_ENDPOINT,
aws_access_key_id=self.R2_ACCESS_KEY,
aws_secret_access_key=self.R2_SECRET_KEY,
config=Config(signature_version="s3v4")
)
self.session = requests.Session() if session is None else session
def upload_file(self, local_path, cloud_path):
t1 = time.time()
head_dict = {
'jpg': 'image/jpeg',
'jpeg': 'image/jpeg',
'png': 'image/png',
'gif': 'image/gif',
'bmp': 'image/bmp',
'webp': 'image/webp',
'ico': 'image/x-icon'
}
ftype = os.path.basename(local_path).split(".")[-1].lower()
ctype = head_dict.get(ftype, 'application/octet-stream')
headers = {"Content-Type": ctype}
cloud_path = f"QwenImageEdit/Uploads/{str(datetime.date.today())}/{os.path.basename(local_path)}"
url = self.client.generate_presigned_url(
"put_object",
Params={"Bucket": self.R2_BUCKET, "Key": cloud_path, "ContentType": ctype},
ExpiresIn=604800
)
retry_count = 0
while retry_count < 3:
try:
with open(local_path, 'rb') as f:
self.session.put(url, data=f.read(), headers=headers, timeout=8)
break
except (requests.exceptions.Timeout, requests.exceptions.RequestException):
retry_count += 1
if retry_count == 3:
raise Exception('Failed to upload file to R2 after 3 retries!')
continue
print("upload_file time is ====>", time.time() - t1)
return f"{self.domain}{cloud_path}"
def upload_user_img_r2(clientIp, timeId, img):
fileName = clientIp.replace(".", "")+str(timeId)+".jpg"
local_path = os.path.join(tmpFolder, fileName)
img = cv2.imread(img)
cv2.imwrite(os.path.join(tmpFolder, fileName), img)
res = R2Api().upload_file(local_path, fileName)
if os.path.exists(local_path):
os.remove(local_path)
return res
@func_timeout.func_set_timeout(10)
def get_country_info(ip):
"""Get country information for IP address"""
try:
# Use the new API URL
url = f"https://qifu-api.baidubce.com/ip/geo/v1/district?ip={ip}"
ret = requests.get(url)
ret.raise_for_status() # Raises exception if request fails (e.g. 404, 500)
json_data = ret.json()
# Based on new JSON structure, country info is under 'data' -> 'country' path
if json_data.get("code") == "Success":
country = json_data.get("data", {}).get("country")
return country if country else "Unknown"
else:
# Handle API error codes
print(f"API request failed: {json_data.get('msg', 'Unknown error')}")
return "Unknown"
except requests.exceptions.RequestException as e:
print(f"Network request failed: {e}")
return "Unknown"
except Exception as e:
print(f"Failed to get IP location: {e}")
return "Unknown"
def get_country_info_safe(ip):
"""Safely get IP location info, returns Unknown on error"""
try:
return get_country_info(ip)
except func_timeout.FunctionTimedOut:
print(f"IP location request timeout: {ip}")
return "Unknown"
except Exception as e:
print(f"Failed to get IP location: {e}")
return "Unknown"
def create_mask_from_layers(base_image, layers):
"""
Create mask image from ImageEditor layers
Args:
base_image (PIL.Image): Original image
layers (list): ImageEditor layer data
Returns:
PIL.Image: Black and white mask image
"""
from PIL import Image, ImageDraw
import numpy as np
# Create blank mask with same size as original image
mask = Image.new('L', base_image.size, 0) # 'L' mode is grayscale, 0 is black
if not layers:
return mask
# Iterate through all layers, set drawn areas to white
for layer in layers:
if layer is not None:
# Convert layer to numpy array
layer_array = np.array(layer)
# Check layer format
if len(layer_array.shape) == 3: # RGB/RGBA format
# If RGBA, check alpha channel
if layer_array.shape[2] == 4:
# Use alpha channel as mask
alpha_channel = layer_array[:, :, 3]
# Set non-transparent areas (alpha > 0) to white
mask_array = np.where(alpha_channel > 0, 255, 0).astype(np.uint8)
else:
# RGB format, check if not pure black (0,0,0)
# Assume drawn areas are non-black
non_black = np.any(layer_array > 0, axis=2)
mask_array = np.where(non_black, 255, 0).astype(np.uint8)
elif len(layer_array.shape) == 2: # Grayscale
# Use grayscale values directly, set non-zero areas to white
mask_array = np.where(layer_array > 0, 255, 0).astype(np.uint8)
else:
continue
# Convert mask_array to PIL image and merge into total mask
layer_mask = Image.fromarray(mask_array, mode='L')
# Resize to match original image
if layer_mask.size != base_image.size:
layer_mask = layer_mask.resize(base_image.size, Image.LANCZOS)
# Merge masks (use maximum value to ensure all drawn areas are included)
mask_array_current = np.array(mask)
layer_mask_array = np.array(layer_mask)
combined_mask_array = np.maximum(mask_array_current, layer_mask_array)
mask = Image.fromarray(combined_mask_array, mode='L')
return mask
def upload_mask_image_r2(client_ip, time_id, mask_image):
"""
Upload mask image to R2
Args:
client_ip (str): Client IP
time_id (int): Timestamp
mask_image (PIL.Image): Mask image
Returns:
str: Uploaded URL
"""
file_name = f"{client_ip.replace('.', '')}{time_id}_mask.png"
local_path = os.path.join(tmpFolder, file_name)
try:
# Save mask image as PNG format (supports transparency)
mask_image.save(local_path, 'PNG')
# Upload to R2
res = R2Api().upload_file(local_path, file_name)
return res
except Exception as e:
print(f"Failed to upload mask image: {e}")
return None
finally:
# Clean up local files
if os.path.exists(local_path):
os.remove(local_path)
def submit_image_edit_task(user_image_url, prompt, task_type="80", mask_image_url=""):
"""
Submit image editing task
"""
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {APIKEY}'
}
data = {
"user_image": user_image_url,
"mask_image": mask_image_url,
"task_type": task_type,
"prompt": prompt,
"secret_key": "219ngu",
"is_private": "0"
}
try:
response = requests.post(
f'{UKAPIURL}/public_image_edit',
headers=headers,
json=data
)
if response.status_code == 200:
result = response.json()
if result.get('code') == 0:
return result['data']['task_id'], None
else:
return None, f"API Error: {result.get('message', 'Unknown error')}"
else:
return None, f"HTTP Error: {response.status_code}"
except Exception as e:
return None, f"Request Exception: {str(e)}"
def check_task_status(task_id):
"""
Query task status
"""
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {APIKEY}'
}
data = {
"task_id": task_id
}
try:
response = requests.post(
f'{UKAPIURL}/status_image_edit',
headers=headers,
json=data
)
if response.status_code == 200:
result = response.json()
if result.get('code') == 0:
task_data = result['data']
return task_data['status'], task_data.get('output1'), task_data
else:
return 'error', None, result.get('message', 'Unknown error')
else:
return 'error', None, f"HTTP Error: {response.status_code}"
except Exception as e:
return 'error', None, f"Request Exception: {str(e)}"
def process_image_edit(img_input, prompt, progress_callback=None):
"""
Complete process for image editing
Args:
img_input: Can be file path (str) or PIL Image object
prompt: Editing instructions
progress_callback: Progress callback function
"""
temp_img_path = None
try:
# Generate client IP and timestamp
client_ip = "127.0.0.1" # Default IP
time_id = int(time.time())
# Process input image - supports PIL Image and file path
if hasattr(img_input, 'save'): # PIL Image object
# Create temporary file
temp_dir = tempfile.mkdtemp()
temp_img_path = os.path.join(temp_dir, f"temp_img_{time_id}.jpg")
# Save PIL Image as temporary file
if img_input.mode != 'RGB':
img_input = img_input.convert('RGB')
img_input.save(temp_img_path, 'JPEG', quality=95)
img_path = temp_img_path
print(f"💾 PIL Image saved as temporary file: {temp_img_path}")
else:
# Assume it's a file path
img_path = img_input
if progress_callback:
progress_callback("uploading image...")
# Upload user image
uploaded_url = upload_user_img_r2(client_ip, time_id, img_path)
if not uploaded_url:
return None, "image upload failed"
# Extract actual image URL from upload URL
if "?" in uploaded_url:
uploaded_url = uploaded_url.split("?")[0]
if progress_callback:
progress_callback("submitting edit task...")
# Submit image editing task
task_id, error = submit_image_edit_task(uploaded_url, prompt)
if error:
return None, error
if progress_callback:
progress_callback(f"task submitted, ID: {task_id}, processing...")
# Wait for task completion
max_attempts = 60 # Wait up to 10 minutes
for attempt in range(max_attempts):
status, output_url, task_data = check_task_status(task_id)
if status == 'completed':
if output_url:
return output_url, "image edit completed"
else:
return None, "Task completed but no result image returned"
elif status == 'error' or status == 'failed':
return None, f"task processing failed: {task_data}"
elif status in ['queued', 'processing', 'running', 'created', 'working']:
if progress_callback:
progress_callback(f"task processing... (status: {status})")
time.sleep(1) # 等待10秒后重试
else:
if progress_callback:
progress_callback(f"unknown status: {status}")
time.sleep(1)
return None, "task processing timeout"
except Exception as e:
return None, f"error occurred during processing: {str(e)}"
finally:
# 清理临时文件
if temp_img_path and os.path.exists(temp_img_path):
try:
os.remove(temp_img_path)
# 尝试删除临时目录(如果为空)
temp_dir = os.path.dirname(temp_img_path)
if os.path.exists(temp_dir):
os.rmdir(temp_dir)
print(f"🗑️ Cleaned up temporary file: {temp_img_path}")
except Exception as cleanup_error:
print(f"⚠️ Failed to clean up temporary file: {cleanup_error}")
def process_local_image_edit(base_image, layers, prompt, progress_callback=None):
"""
处理局部图片编辑的完整流程
Args:
base_image (PIL.Image): 原始图片
layers (list): ImageEditor的层数据
prompt (str): 编辑指令
progress_callback: 进度回调函数
"""
temp_img_path = None
temp_mask_path = None
try:
# Generate client IP and timestamp
client_ip = "127.0.0.1" # Default IP
time_id = int(time.time())
if progress_callback:
progress_callback("正在创建mask图片...")
# 从layers创建mask图片
mask_image = create_mask_from_layers(base_image, layers)
# 检查mask是否有内容
mask_array = np.array(mask_image)
if np.max(mask_array) == 0:
return None, "请在图片上绘制需要编辑的区域"
print(f"📝 创建mask图片成功,绘制区域像素数: {np.sum(mask_array > 0)}")
if progress_callback:
progress_callback("正在上传原始图片...")
# 处理并上传原始图片
temp_dir = tempfile.mkdtemp()
temp_img_path = os.path.join(temp_dir, f"temp_img_{time_id}.jpg")
# 保存原始图片
if base_image.mode != 'RGB':
base_image = base_image.convert('RGB')
base_image.save(temp_img_path, 'JPEG', quality=95)
# 上传原始图片
uploaded_url = upload_user_img_r2(client_ip, time_id, temp_img_path)
if not uploaded_url:
return None, "原始图片上传失败"
# 从上传 URL 中提取实际的图片 URL
if "?" in uploaded_url:
uploaded_url = uploaded_url.split("?")[0]
if progress_callback:
progress_callback("正在上传mask图片...")
# 上传mask图片
mask_url = upload_mask_image_r2(client_ip, time_id, mask_image)
if not mask_url:
return None, "mask图片上传失败"
# 从上传 URL 中提取实际的图片 URL
if "?" in mask_url:
mask_url = mask_url.split("?")[0]
print(f"📤 图片上传成功:")
print(f" 原始图片: {uploaded_url}")
print(f" Mask图片: {mask_url}")
if progress_callback:
progress_callback("正在提交局部编辑任务...")
# 提交局部图片编辑任务 (task_type=81)
task_id, error = submit_image_edit_task(uploaded_url, prompt, task_type="81", mask_image_url=mask_url)
if error:
return None, error
if progress_callback:
progress_callback(f"任务已提交,ID: {task_id},正在处理...")
print(f"🚀 局部编辑任务已提交,任务ID: {task_id}")
# Wait for task completion
max_attempts = 60 # Wait up to 10 minutes
for attempt in range(max_attempts):
status, output_url, task_data = check_task_status(task_id)
if status == 'completed':
if output_url:
print(f"✅ 局部编辑任务完成,结果: {output_url}")
return output_url, "局部图片编辑完成"
else:
return None, "任务完成但未返回结果图片"
elif status == 'error' or status == 'failed':
return None, f"任务处理失败: {task_data}"
elif status in ['queued', 'processing', 'running', 'created', 'working']:
if progress_callback:
progress_callback(f"正在处理中... (状态: {status})")
time.sleep(1) # Wait 1 second before retry
else:
if progress_callback:
progress_callback(f"未知状态: {status}")
time.sleep(1)
return None, "任务处理超时"
except Exception as e:
print(f"❌ 局部编辑处理异常: {str(e)}")
return None, f"处理过程中发生错误: {str(e)}"
finally:
# 清理临时文件
if temp_img_path and os.path.exists(temp_img_path):
try:
os.remove(temp_img_path)
temp_dir = os.path.dirname(temp_img_path)
if os.path.exists(temp_dir):
os.rmdir(temp_dir)
print(f"🗑️ Cleaned up temporary file: {temp_img_path}")
except Exception as cleanup_error:
print(f"⚠️ Failed to clean up temporary file: {cleanup_error}")
if __name__ == "__main__":
pass
|