Spaces:
Sleeping
Sleeping
| import os | |
| import requests | |
| from flask import Flask, Response, request, stream_with_context # session از ایمپورتها حذف شد | |
| from urllib.parse import urlparse, urljoin, urlunparse | |
| import re | |
| import random | |
| import logging | |
| import json | |
| app = Flask(__name__) | |
| # هیچ secret_key یا تنظیمات مربوط به سشن اضافه نشده است | |
| SECRET_NAME_FOR_TARGET_URL = "TARGET_HF_SPACE_URL" | |
| TARGET_URL_FROM_SECRET = os.environ.get(SECRET_NAME_FOR_TARGET_URL) | |
| REQUEST_TIMEOUT = 45 | |
| log = logging.getLogger('werkzeug') | |
| log.setLevel(logging.ERROR) | |
| app.logger.setLevel(logging.CRITICAL + 1) # این تنظیم از کد اصلی شما حفظ شده | |
| print(f"APP_STARTUP: TARGET_URL_FROM_SECRET: {'SET' if TARGET_URL_FROM_SECRET else 'NOT SET'}") | |
| COMMON_USER_AGENTS = [ | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.5005.63 Safari/537.36", | |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.5 Safari/605.1.15", | |
| ] | |
| SIMPLE_TEXT_REPLACEMENTS = { | |
| "ttttttt": "ادیت", | |
| "Chat with Aya": "چت🤖", | |
| "Visualize with Aya": "عکس", | |
| "Speak with Aya": "", | |
| "Model:": "مدل:", | |
| "Developed by:": "توسعهدهنده:", | |
| "License:": "مجوز:", | |
| "Input Prompt": "فرمان ورودی", | |
| "Input Image": "تصویر ورودی", | |
| "Drop Image Here": "تصویر را اینجا رها کنید", | |
| "- or -": "- یا -", | |
| "Click to Upload": "برای آپلود کلیک کنید", | |
| "Ask anything in our 23 languages ...": "به هر یک از ۲۳ زبان سوال خود را بپرسید ...", | |
| "Connecting Our World": "اتصال دنیای ما", | |
| # "متن اصلی دیگری که میخواهید جایگزین شود": "متن جدید شما", | |
| } | |
| GRADIO_HIDE_CSS_STANDARD = """ | |
| <style> | |
| .gradio-container .meta-footer, .gradio-container footer, div[class*="footer"], footer, a[href*="gradio.app"], | |
| .gradio-container button[id*="settings"], .gradio-container div[class*="settings-button"], | |
| a[href*="gradio.app/"], button[title*="Settings"], button[aria-label*="Settings"], div[data-testid*="settings"] { | |
| display: none !important; visibility: hidden !important; opacity: 0 !important; | |
| width: 0 !important; height: 0 !important; overflow: hidden !important; | |
| margin: 0 !important; padding: 0 !important; border: none !important; | |
| font-size: 0 !important; line-height: 0 !important; | |
| } | |
| .gr-proxy-item-hidden-by-text { display: none !important; visibility: hidden !important; } | |
| </style> | |
| """ | |
| def replace_text_server_side(html_content_string, replacements): | |
| processed_content = html_content_string | |
| if replacements: | |
| for old_text, new_text in replacements.items(): | |
| if isinstance(old_text, str) and isinstance(new_text, str): | |
| try: | |
| # استفاده از re.sub برای جایگزینی با نادیده گرفتن بزرگی/کوچکی حروف | |
| # re.escape برای مدیریت کاراکترهای خاص در old_text | |
| processed_content = re.sub(re.escape(old_text), new_text, processed_content, flags=re.IGNORECASE) | |
| except re.error as e: | |
| app.logger.error(f"Regex error during server-side text replacement for '{old_text}': {e}") | |
| else: | |
| app.logger.warning(f"Invalid type for replacement: key='{old_text}', value='{new_text}'") | |
| return processed_content | |
| def process_html_content_server_side_minimal(html_string): | |
| processed_html = html_string | |
| patterns = [ | |
| re.compile(r'<[^>]*(?:class|id)\s*=\s*["\'][^"\']*(?:footer|meta-footer|built-with|gradio-footer)[^"\']*["\'][^>]*>.*?Built with Gradio.*?<\/[^>]+>', re.I | re.S), | |
| re.compile(r'<footer\b[^>]*>.*?Built with Gradio.*?</footer>', re.I | re.S), | |
| re.compile(r'<a\b[^>]*href="https?://(?:www\.)?gradio\.app[^"]*"[^>]*>.*?Built with Gradio.*?</a>', re.I | re.S), | |
| re.compile(r'<div\b[^>]*>.*?Built with Gradio.*?gradio\.app.*?</div>', re.I | re.S), | |
| re.compile(r'<a\b[^>]*href="https?://(?:www\.)?gradio\.app[^"]*"[^>]*>.*?</a>', re.I | re.S), | |
| re.compile(r'<[^>]*(?:class|id)\s*=\s*["\'][^"\']*(?:settings|options)[^"\']*["\'][^>]*>.*?Settings.*?<\/[^>]+>', re.I | re.S), | |
| re.compile(r'<(?:button|a)\b[^>]*>.*?Settings.*?</(?:button|a)>', re.I | re.S), | |
| ] | |
| for p in patterns: processed_html = p.sub('', processed_html) | |
| return processed_html | |
| def proxy_request_handler(path): | |
| # --- START: بخش اضافه شده برای بررسی Referer --- | |
| allowed_referer_domains = ["aisada.ir", "www.aisada.ir"] | |
| # اگر درخواست از نوع OPTIONS (معمولا برای CORS preflight) است، اجازه عبور میدهیم. | |
| # این درخواستها نباید Referer داشته باشند و مسدود کردن آنها مشکل ایجاد میکند. | |
| if request.method == 'OPTIONS': | |
| pass # اجازه میدهیم درخواست به بخش اصلی کد پراکسی برسد | |
| # فقط درخواستهای GET که به نظر میرسد برای بارگذاری یک صفحه HTML هستند، Referer را بررسی میکنیم. | |
| # مسیر خالی (روت)، یا مسیری که به / ختم میشود، یا مسیری که به .html یا .htm ختم میشود. | |
| elif request.method == 'GET' and \ | |
| (path == '' or \ | |
| path.endswith('/') or \ | |
| path.endswith('.html') or \ | |
| path.endswith('.htm')): | |
| referer_header = request.headers.get("Referer") | |
| is_allowed = False | |
| if referer_header: | |
| try: | |
| parsed_referer = urlparse(referer_header) | |
| referer_host = parsed_referer.hostname | |
| if referer_host and referer_host.lower() in allowed_referer_domains: | |
| is_allowed = True | |
| except ValueError: | |
| # اگر Referer قابل تجزیه نباشد، is_allowed همچنان False خواهد بود | |
| pass | |
| if not is_allowed: | |
| # با توجه به تنظیمات لاگ شما، این پیامها در کنسول دیده نخواهند شد مگر اینکه سطح لاگ را تغییر دهید. | |
| # app.logger.warning(f"Access Denied (403) for path '{path}'. Referer: '{referer_header}'. IP: {request.remote_addr}") | |
| return "Error This app is broken. .", 403 | |
| # برای سایر انواع درخواست (POST, PUT, ...) یا درخواستهای GET که به نظر نمیرسد | |
| # صفحه HTML باشند (مانند CSS, JS, تصاویر)، فعلا از بررسی Referer صرفنظر میکنیم. | |
| # فرض بر این است که اگر صفحه اصلی با Referer معتبر بارگذاری شده، درخواستهای داخلی نیز مجاز هستند. | |
| # --- END: بخش اضافه شده برای بررسی Referer --- | |
| if not TARGET_URL_FROM_SECRET: | |
| return "Proxy Configuration Error: Target URL secret is not configured.", 500 | |
| base_target_url = TARGET_URL_FROM_SECRET.rstrip('/') | |
| target_full_url = urljoin(base_target_url + "/", path) | |
| if request.query_string: target_full_url += "?" + request.query_string.decode() | |
| try: | |
| parsed_target_url_for_host = urlparse(TARGET_URL_FROM_SECRET) | |
| target_hostname = parsed_target_url_for_host.hostname | |
| user_agent_to_send = random.choice(COMMON_USER_AGENTS) | |
| excluded_incoming_headers = [ | |
| 'host', 'cookie', 'connection', 'upgrade-insecure-requests', | |
| 'if-none-match', 'if-modified-since', 'referer', 'x-hf-space-host', | |
| 'x-forwarded-for', 'x-forwarded-proto', 'x-forwarded-host', 'accept-encoding' | |
| ] | |
| forward_headers = {key: value for key, value in request.headers.items() if key.lower() not in excluded_incoming_headers} | |
| forward_headers['User-Agent'] = user_agent_to_send | |
| forward_headers['Host'] = target_hostname | |
| if request.environ.get('HTTP_X_FORWARDED_FOR'): forward_headers['X-Forwarded-For'] = request.environ.get('HTTP_X_FORWARDED_FOR') + ', ' + request.remote_addr | |
| else: forward_headers['X-Forwarded-For'] = request.remote_addr | |
| forward_headers['X-Forwarded-Proto'] = request.scheme | |
| forward_headers['X-Forwarded-Host'] = request.host | |
| request_body = request.get_data() if request.method not in ['GET', 'HEAD', 'OPTIONS'] else None | |
| with requests.Session() as s: | |
| target_response = s.request( | |
| method=request.method, url=target_full_url, headers=forward_headers, | |
| data=request_body, stream=True, timeout=REQUEST_TIMEOUT, allow_redirects=False | |
| ) | |
| if 300 <= target_response.status_code < 400 and 'Location' in target_response.headers: | |
| location_header_val = target_response.headers['Location'] | |
| parsed_location = urlparse(location_header_val) | |
| rewritten_location = location_header_val | |
| if parsed_location.scheme and parsed_location.netloc == target_hostname: | |
| rewritten_location = parsed_location.path | |
| if parsed_location.query: rewritten_location += "?" + parsed_location.query | |
| if not rewritten_location.startswith('/') and rewritten_location and not urlparse(rewritten_location).scheme: | |
| rewritten_location = '/' + rewritten_location | |
| final_redirect_headers = {'Location': rewritten_location if rewritten_location else "/"} | |
| for k, v in target_response.headers.items(): | |
| if k.lower() in ['set-cookie', 'cache-control', 'expires', 'pragma', 'vary']: final_redirect_headers[k] = v | |
| return Response(response=None, status=target_response.status_code, headers=final_redirect_headers) | |
| if target_response.status_code >= 400: | |
| error_headers = {k:v for k,v in target_response.headers.items() if k.lower() not in ['content-encoding', 'transfer-encoding', 'content-length', 'link']} | |
| return Response(target_response.content, status=target_response.status_code, headers=error_headers) | |
| content_type = target_response.headers.get('Content-Type', 'application/octet-stream').lower() | |
| excluded_resp_headers = [ | |
| 'content-encoding', 'transfer-encoding', 'connection', 'keep-alive', | |
| 'x-frame-options', 'strict-transport-security', 'public-key-pins', | |
| 'content-length', 'server', 'x-powered-by', 'date', 'alt-svc', 'link' | |
| ] | |
| final_response_headers = {k: v for k, v in target_response.headers.items() if k.lower() not in excluded_resp_headers} | |
| final_response_headers['Content-Type'] = content_type | |
| final_response_headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, max-age=0' | |
| final_response_headers['Pragma'] = 'no-cache' | |
| final_response_headers['Expires'] = '0' | |
| final_response_headers['Referrer-Policy'] = 'strict-origin-when-cross-origin' | |
| def generate_response_content_stream_inner(): | |
| is_html = 'text/html' in content_type | |
| css_injected = False | |
| js_injected = False | |
| html_buffer = b'' | |
| observer_script = "" | |
| if is_html and SIMPLE_TEXT_REPLACEMENTS: | |
| replacements_json_str = json.dumps(SIMPLE_TEXT_REPLACEMENTS) | |
| js_code_template = """<script id="grProxyTextReplacerScript">(function(){{if(document.getElementById('grProxyTextReplacerScriptLoaded'))return;const el=document.createElement('div');el.id='grProxyTextReplacerScriptLoaded';el.style.display='none';if(document.body)document.body.appendChild(el);else if(document.head)document.head.appendChild(el);const r={json_payload};function esc(s){{return s.replace(/[.*+?^${{}}()|[\]\\\\]/g,'\\\\$&');}}function apply(n){{if(!n)return;if(n.nodeType===Node.ELEMENT_NODE)for(let i=0;i<n.childNodes.length;i++)apply(n.childNodes[i]);else if(n.nodeType===Node.TEXT_NODE){{if(!n.nodeValue||n.nodeValue.trim()==='')return;let t=n.nodeValue;let c=false;for(const k in r){{const v=r[k];const rgx=new RegExp(esc(k),'gi');if(rgx.test(t)){{if(v===""&&n.parentElement&&n.parentElement.tagName!=='BODY'&&n.parentElement.tagName!=='HTML'){{if(n.parentElement)n.parentElement.classList.add('gr-proxy-item-hidden-by-text');t=t.replace(rgx,'');c=true;}}else if(v!==""){t=t.replace(rgx,v);c=true;}}}}if(c)n.nodeValue=t;}}}}function proc(m,o){{apply(document.body);}}const cfg={{childList:true,subtree:true,characterData:true}};const obs=new MutationObserver(proc);function init(){{if(document.body){{apply(document.body);obs.observe(document.body,cfg);}}else setTimeout(init,50);}}if(document.readyState==='loading')document.addEventListener('DOMContentLoaded',init);else init();}})();</script>""" | |
| observer_script = js_code_template.replace("{json_payload}", replacements_json_str) | |
| char_enc_match = None | |
| if is_html: | |
| char_enc_match = re.search(r'charset=([\w-]+)', content_type, re.I) | |
| proc_enc = char_enc_match.group(1) if char_enc_match else 'utf-8' | |
| try: | |
| for chunk in target_response.iter_content(chunk_size=8192, decode_unicode=False): | |
| if not chunk: continue | |
| if is_html: | |
| html_buffer += chunk | |
| if len(html_buffer) < 4096 and not (b"</head>" in html_buffer or b"</body>" in html_buffer or b"</html>" in html_buffer): | |
| continue | |
| try: str_buffer = html_buffer.decode(proc_enc, 'replace') | |
| except UnicodeDecodeError: str_buffer = html_buffer.decode('latin-1', 'replace') | |
| processed_str = str_buffer | |
| if SIMPLE_TEXT_REPLACEMENTS: | |
| processed_str = replace_text_server_side(processed_str, SIMPLE_TEXT_REPLACEMENTS) | |
| if not css_injected: | |
| head_m = re.search(r'(<head\b[^>]*>)', processed_str, re.I) | |
| if head_m: | |
| processed_str = processed_str[:head_m.end(1)] + GRADIO_HIDE_CSS_STANDARD + processed_str[head_m.end(1):] | |
| css_injected = True | |
| processed_str = process_html_content_server_side_minimal(processed_str) | |
| if observer_script and not js_injected: | |
| body_end_m = re.search(r'(</body>)', processed_str, re.I) | |
| if body_end_m: | |
| processed_str = processed_str[:body_end_m.start(1)] + observer_script + processed_str[body_end_m.start(1):] | |
| js_injected = True | |
| yield processed_str.encode(proc_enc, 'replace') | |
| html_buffer = b'' | |
| else: | |
| yield chunk | |
| if html_buffer and is_html: | |
| try: str_buffer = html_buffer.decode(proc_enc, 'replace') | |
| except UnicodeDecodeError: str_buffer = html_buffer.decode('latin-1', 'replace') | |
| final_proc_str = str_buffer | |
| if SIMPLE_TEXT_REPLACEMENTS: | |
| final_proc_str = replace_text_server_side(final_proc_str, SIMPLE_TEXT_REPLACEMENTS) | |
| if not css_injected: | |
| head_m = re.search(r'(<head\b[^>]*>)', final_proc_str, re.I) | |
| payload = GRADIO_HIDE_CSS_STANDARD | |
| if head_m: final_proc_str = final_proc_str[:head_m.end(1)] + payload + final_proc_str[head_m.end(1):] | |
| else: final_proc_str = payload + final_proc_str | |
| final_proc_str = process_html_content_server_side_minimal(final_proc_str) | |
| if observer_script and not js_injected and not re.search(r'<script id="grProxyTextReplacerScript">', final_proc_str, re.I): | |
| body_end_m = re.search(r'(</body>)', final_proc_str, re.I) | |
| if body_end_m: final_proc_str = final_proc_str[:body_end_m.start(1)] + observer_script + final_proc_str[body_end_m.start(1):] | |
| else: final_proc_str += observer_script | |
| yield final_proc_str.encode(proc_enc, 'replace') | |
| elif html_buffer: | |
| yield html_buffer | |
| except Exception as e_stream: | |
| app.logger.error(f"Stream error for {target_full_url}: {e_stream}") | |
| finally: | |
| target_response.close() | |
| if request.method == 'HEAD': | |
| return Response(None, status=target_response.status_code, headers=final_response_headers) | |
| else: | |
| if 'x-frame-options' in final_response_headers: del final_response_headers['x-frame-options'] | |
| return Response(stream_with_context(generate_response_content_stream_inner()), status=target_response.status_code, headers=final_response_headers) | |
| except requests.exceptions.Timeout: | |
| app.logger.error(f"Timeout ({REQUEST_TIMEOUT}s) for {target_full_url}") | |
| return "Error: Request to target timed out.", 504 | |
| except requests.exceptions.HTTPError as e_http: | |
| error_headers = {k:v for k,v in e_http.response.headers.items() if k.lower() not in ['content-encoding', 'transfer-encoding', 'content-length', 'link']} | |
| return Response(e_http.response.content, status=e_http.response.status_code, headers=error_headers) | |
| except requests.exceptions.RequestException as e_req: | |
| app.logger.error(f"RequestException for {target_full_url}: {e_req}") | |
| return f"Error fetching content: {e_req}", 502 | |
| except Exception as e_gen: | |
| app.logger.exception(f"Unexpected error proxying {target_full_url}") | |
| return f"Unexpected server error: {str(e_gen)}", 500 | |
| if __name__ == '__main__': | |
| port = int(os.environ.get("PORT", 7860)) | |
| debug_mode = os.environ.get("FLASK_DEBUG", "false").lower() == "true" | |
| print(f"INFO: Starting Flask app on host 0.0.0.0, port {port}, debug: {debug_mode}") | |
| if not os.environ.get("WITH_GUNICORN"): | |
| app.run(host='0.0.0.0', port=port, debug=debug_mode) |