import gradio as gr import threading import os import shutil import tempfile import time from util import process_image_edit, get_country_info_safe, get_location_info_safe, contains_chinese from nfsw import NSFWDetector # 配置参数 NSFW_TIME_WINDOW = 5 # 时间窗口:5分钟 NSFW_LIMIT = 6 # 限制次数:6次 IP_Dict = {} NSFW_Dict = {} # 记录每个IP的NSFW违规次数 NSFW_Time_Dict = {} # 记录每个IP在特定时间窗口的NSFW检测次数,键格式: "ip_timestamp" def get_current_time_window(): """ 获取当前的整点时间窗口 Returns: tuple: (窗口开始时间戳, 窗口结束时间戳) """ current_time = time.time() # 获取当前时间的分钟数 current_struct = time.localtime(current_time) current_minute = current_struct.tm_min # 计算当前5分钟时间窗口的开始分钟 window_start_minute = (current_minute // NSFW_TIME_WINDOW) * NSFW_TIME_WINDOW # 构建窗口开始时间 window_start_struct = time.struct_time(( current_struct.tm_year, current_struct.tm_mon, current_struct.tm_mday, current_struct.tm_hour, window_start_minute, 0, current_struct.tm_wday, current_struct.tm_yday, current_struct.tm_isdst )) window_start_time = time.mktime(window_start_struct) window_end_time = window_start_time + (NSFW_TIME_WINDOW * 60) return window_start_time, window_end_time def check_nsfw_rate_limit(client_ip): """ 检查IP的NSFW检测频率限制(基于整点时间窗口) Args: client_ip (str): 客户端IP地址 Returns: tuple: (是否超过限制, 剩余等待时间) """ current_time = time.time() window_start_time, window_end_time = get_current_time_window() # 清理不在当前时间窗口的记录 current_window_key = f"{client_ip}_{int(window_start_time)}" # 如果没有当前窗口的记录,创建新的 if current_window_key not in NSFW_Time_Dict: NSFW_Time_Dict[current_window_key] = 0 # 清理旧的窗口记录(保持内存清洁) keys_to_remove = [] for key in NSFW_Time_Dict: if key.startswith(client_ip + "_"): window_time = int(key.split("_")[1]) if window_time < window_start_time: keys_to_remove.append(key) for key in keys_to_remove: del NSFW_Time_Dict[key] # 检查当前窗口是否超过限制 if NSFW_Time_Dict[current_window_key] >= NSFW_LIMIT: # 计算到下一个时间窗口的等待时间 wait_time = window_end_time - current_time return True, max(0, wait_time) return False, 0 def record_nsfw_detection(client_ip): """ 记录IP的NSFW检测时间(基于整点时间窗口) Args: client_ip (str): 客户端IP地址 """ window_start_time, _ = get_current_time_window() current_window_key = f"{client_ip}_{int(window_start_time)}" # 增加当前窗口的计数 if current_window_key not in NSFW_Time_Dict: NSFW_Time_Dict[current_window_key] = 0 NSFW_Time_Dict[current_window_key] += 1 # 记录到NSFW_Dict中(兼容现有逻辑) if client_ip not in NSFW_Dict: NSFW_Dict[client_ip] = 0 NSFW_Dict[client_ip] += 1 def get_current_window_info(client_ip): """ 获取当前窗口的统计信息(用于调试) Args: client_ip (str): 客户端IP地址 Returns: dict: 当前窗口的统计信息 """ window_start_time, window_end_time = get_current_time_window() current_window_key = f"{client_ip}_{int(window_start_time)}" current_count = NSFW_Time_Dict.get(current_window_key, 0) # 格式化时间显示 start_time_str = time.strftime("%H:%M:%S", time.localtime(window_start_time)) end_time_str = time.strftime("%H:%M:%S", time.localtime(window_end_time)) return { "window_start": start_time_str, "window_end": end_time_str, "current_count": current_count, "limit": NSFW_LIMIT, "window_key": current_window_key } # 初始化NSFW检测器(从Hugging Face下载) try: nsfw_detector = NSFWDetector() # 自动从Hugging Face下载falconsai_yolov9_nsfw_model_quantized.pt print("✅ NSFW检测器初始化成功") except Exception as e: print(f"❌ NSFW检测器初始化失败: {e}") nsfw_detector = None def edit_image_interface(input_image, prompt, request: gr.Request, progress=gr.Progress()): """ Interface function for processing image editing """ # 提取用户IP client_ip = request.client.host x_forwarded_for = dict(request.headers).get('x-forwarded-for') if x_forwarded_for: client_ip = x_forwarded_for if client_ip not in IP_Dict: IP_Dict[client_ip] = 0 IP_Dict[client_ip] += 1 # 获取IP属地信息 country_info = get_country_info_safe(client_ip) location_info = get_location_info_safe(client_ip) # # 检查IP是否因NSFW违规过多而被屏蔽 3 # if client_ip in NSFW_Dict and NSFW_Dict[client_ip] >= 5: # print(f"❌ IP blocked due to excessive NSFW violations - IP: {client_ip}({country_info}), violations: {NSFW_Dict[client_ip]}") # # 检查是否需要特殊处理(中文prompt或中国IP) # has_chinese = contains_chinese(prompt.strip()) if prompt else False # is_china_ip = location_info.get("country", "").lower() == "中国" # if has_chinese or is_china_ip: # # 构建特殊警告信息 # prov = location_info.get("prov", "未知省份") # city = location_info.get("city", "未知城市") # isp = location_info.get("isp", "Unknown") # # 处理城市名称 # city_name = city.replace("市", "") if city != "未知城市" else "当地" # # 处理ISP名称,去掉"中国"前缀 # isp_name = isp.replace("中国", "") if isp != "Unknown" else "未知" # warning_msg = f"您的ip地址为{client_ip},地点为{prov}-{city},您的网络服务商为{isp_name},检测到您目前正在浏览,制作淫秽内容,已经上报给 {city_name}市公安局,已加急处理,请耐心等待警察上门调查" # return None, f"❌ {warning_msg}" # else: # return None, f"❌ Your ip {client_ip}, your region has been blocked for too much nsfw content" if input_image is None: return None, "Please upload an image first" if not prompt or prompt.strip() == "": return None, "Please enter editing prompt" # 检查prompt长度是否大于3个字符 if len(prompt.strip()) <= 3: return None, "❌ Editing prompt must be more than 3 characters" # 检查图片是否包含NSFW内容 nsfw_result = None if nsfw_detector is not None and input_image is not None: try: # 直接使用PIL Image对象进行检测,避免文件路径问题 nsfw_result = nsfw_detector.predict_pil_label_only(input_image) print(f"🔍 NSFW检测结果: {nsfw_result} - IP: {client_ip}({country_info})") if nsfw_result.lower() == "nsfw": # 检查NSFW频率限制 is_rate_limited, wait_time = check_nsfw_rate_limit(client_ip) if is_rate_limited: # 超过频率限制,显示等待提示并阻止继续 wait_minutes = int(wait_time / 60) + 1 # 向上取整到分钟 window_info = get_current_window_info(client_ip) print(f"⚠️ NSFW频率限制 - IP: {client_ip}({country_info})") print(f" 时间窗口: {window_info['window_start']} - {window_info['window_end']}") print(f" 当前计数: {window_info['current_count']}/{NSFW_LIMIT}, 需要等待 {wait_minutes} 分钟") return None, f"❌ Please wait {wait_minutes} minutes before generating again" else: # 未超过频率限制,记录此次检测但允许继续处理 record_nsfw_detection(client_ip) window_info = get_current_window_info(client_ip) # print(f"🔍 NSFW检测记录 - IP: {client_ip}({country_info})") # print(f" 时间窗口: {window_info['window_start']} - {window_info['window_end']}") # print(f" 当前计数: {window_info['current_count']}/{NSFW_LIMIT}, 允许继续处理") # 不return,允许继续处理图片编辑 except Exception as e: print(f"⚠️ NSFW检测失败: {e}") # 检测失败时允许继续处理 if IP_Dict[client_ip]>10 and country_info.lower() in ["印度", "巴基斯坦"]: print(f"❌ Content not allowed - IP: {client_ip}({country_info}), count: {IP_Dict[client_ip]}, prompt: {prompt.strip()}") return None, "❌ Content not allowed. Please modify your prompt" # if IP_Dict[client_ip]>18 and country_info.lower() in ["中国"]: # print(f"❌ Content not allowed - IP: {client_ip}({country_info}), count: {IP_Dict[client_ip]}, prompt: {prompt.strip()}") # return None, "❌ Content not allowed. Please modify your prompt" # if client_ip.lower() in ["221.194.171.230", "101.126.56.37", "101.126.56.44"]: # print(f"❌ Content not allowed - IP: {client_ip}({country_info}), count: {IP_Dict[client_ip]}, prompt: {prompt.strip()}") # return None, "❌ Content not allowed. Please modify your prompt" result_url = None status_message = "" def progress_callback(message): nonlocal status_message status_message = message progress(0.5, desc=message) try: # 打印成功访问的信息 print(f"✅ Processing started - IP: {client_ip}({country_info}), count: {IP_Dict[client_ip]}, prompt: {prompt.strip()}", flush=True) # Call image editing processing function result_url, message = process_image_edit(input_image, prompt.strip(), progress_callback) if result_url: print(f"✅ Processing completed successfully - IP: {client_ip}({country_info}), result_url: {result_url}", flush=True) progress(1.0, desc="Processing completed") return result_url, "✅ " + message else: print(f"❌ Processing failed - IP: {client_ip}({country_info}), error: {message}", flush=True) return None, "❌ " + message except Exception as e: return None, f"❌ Error occurred during processing: {str(e)}" # Create Gradio interface def create_app(): with gr.Blocks( title="AI Image Editor", theme=gr.themes.Soft(), css=""" .main-container { max-width: 1200px; margin: 0 auto; } .upload-area { border: 2px dashed #ccc; border-radius: 10px; padding: 20px; text-align: center; } .result-area { margin-top: 20px; padding: 20px; border-radius: 10px; background-color: #f8f9fa; } """ ) as app: gr.Markdown( """ # 🎨 AI Image Editor """, elem_classes=["main-container"] ) with gr.Row(): with gr.Column(scale=1): gr.Markdown("### 📸 Upload Image") input_image = gr.Image( label="Select image to edit", type="pil", height=400, elem_classes=["upload-area"] ) gr.Markdown("### ✍️ Editing Instructions") prompt_input = gr.Textbox( label="Enter editing prompt", placeholder="For example: change background to beach, add rainbow, remove background, etc...", lines=3, max_lines=5 ) edit_button = gr.Button( "🚀 Start Editing", variant="primary", size="lg" ) with gr.Column(scale=1): gr.Markdown("### 🎯 Editing Result") output_image = gr.Image( label="Edited image", height=400, elem_classes=["result-area"] ) status_output = gr.Textbox( label="Processing status", lines=2, max_lines=3, interactive=False ) # Example area gr.Markdown("### 💡 Prompt Examples") with gr.Row(): example_prompts = [ "Change the character's background to a sunny seaside with blue waves.", "Change the character's background to New York at night with neon lights.", "Change the character's background to a fairytale castle with bright colors.", "Change background to forest", "Change background to snow mountain" ] for prompt in example_prompts: gr.Button( prompt, size="sm" ).click( lambda p=prompt: p, outputs=prompt_input ) # Bind button click event edit_button.click( fn=edit_image_interface, inputs=[input_image, prompt_input], outputs=[output_image, status_output], show_progress=True ) return app if __name__ == "__main__": app = create_app() app.queue() # Enable queue to handle concurrent requests app.launch()