Translate-all / app.py
Asrasahar's picture
Update app.py
bb5e083 verified
import os
import requests
from flask import Flask, Response, request, stream_with_context # session از ایمپورت‌ها حذف شد
from urllib.parse import urlparse, urljoin, urlunparse
import re
import random
import logging
import json
app = Flask(__name__)
# هیچ secret_key یا تنظیمات مربوط به سشن اضافه نشده است
SECRET_NAME_FOR_TARGET_URL = "TARGET_HF_SPACE_URL"
TARGET_URL_FROM_SECRET = os.environ.get(SECRET_NAME_FOR_TARGET_URL)
REQUEST_TIMEOUT = 45
log = logging.getLogger('werkzeug')
log.setLevel(logging.ERROR)
app.logger.setLevel(logging.CRITICAL + 1) # این تنظیم از کد اصلی شما حفظ شده
print(f"APP_STARTUP: TARGET_URL_FROM_SECRET: {'SET' if TARGET_URL_FROM_SECRET else 'NOT SET'}")
COMMON_USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.5005.63 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.5 Safari/605.1.15",
]
SIMPLE_TEXT_REPLACEMENTS = {
"ttttttt": "ادیت",
"Chat with Aya": "چت🤖",
"Visualize with Aya": "عکس",
"Speak with Aya": "",
"Model:": "مدل:",
"Developed by:": "توسعه‌دهنده:",
"License:": "مجوز:",
"Input Prompt": "فرمان ورودی",
"Input Image": "تصویر ورودی",
"Drop Image Here": "تصویر را اینجا رها کنید",
"- or -": "- یا -",
"Click to Upload": "برای آپلود کلیک کنید",
"Ask anything in our 23 languages ...": "به هر یک از ۲۳ زبان سوال خود را بپرسید ...",
"Connecting Our World": "اتصال دنیای ما",
# "متن اصلی دیگری که می‌خواهید جایگزین شود": "متن جدید شما",
}
GRADIO_HIDE_CSS_STANDARD = """
<style>
.gradio-container .meta-footer, .gradio-container footer, div[class*="footer"], footer, a[href*="gradio.app"],
.gradio-container button[id*="settings"], .gradio-container div[class*="settings-button"],
a[href*="gradio.app/"], button[title*="Settings"], button[aria-label*="Settings"], div[data-testid*="settings"] {
display: none !important; visibility: hidden !important; opacity: 0 !important;
width: 0 !important; height: 0 !important; overflow: hidden !important;
margin: 0 !important; padding: 0 !important; border: none !important;
font-size: 0 !important; line-height: 0 !important;
}
.gr-proxy-item-hidden-by-text { display: none !important; visibility: hidden !important; }
</style>
"""
def replace_text_server_side(html_content_string, replacements):
processed_content = html_content_string
if replacements:
for old_text, new_text in replacements.items():
if isinstance(old_text, str) and isinstance(new_text, str):
try:
# استفاده از re.sub برای جایگزینی با نادیده گرفتن بزرگی/کوچکی حروف
# re.escape برای مدیریت کاراکترهای خاص در old_text
processed_content = re.sub(re.escape(old_text), new_text, processed_content, flags=re.IGNORECASE)
except re.error as e:
app.logger.error(f"Regex error during server-side text replacement for '{old_text}': {e}")
else:
app.logger.warning(f"Invalid type for replacement: key='{old_text}', value='{new_text}'")
return processed_content
def process_html_content_server_side_minimal(html_string):
processed_html = html_string
patterns = [
re.compile(r'<[^>]*(?:class|id)\s*=\s*["\'][^"\']*(?:footer|meta-footer|built-with|gradio-footer)[^"\']*["\'][^>]*>.*?Built with Gradio.*?<\/[^>]+>', re.I | re.S),
re.compile(r'<footer\b[^>]*>.*?Built with Gradio.*?</footer>', re.I | re.S),
re.compile(r'<a\b[^>]*href="https?://(?:www\.)?gradio\.app[^"]*"[^>]*>.*?Built with Gradio.*?</a>', re.I | re.S),
re.compile(r'<div\b[^>]*>.*?Built with Gradio.*?gradio\.app.*?</div>', re.I | re.S),
re.compile(r'<a\b[^>]*href="https?://(?:www\.)?gradio\.app[^"]*"[^>]*>.*?</a>', re.I | re.S),
re.compile(r'<[^>]*(?:class|id)\s*=\s*["\'][^"\']*(?:settings|options)[^"\']*["\'][^>]*>.*?Settings.*?<\/[^>]+>', re.I | re.S),
re.compile(r'<(?:button|a)\b[^>]*>.*?Settings.*?</(?:button|a)>', re.I | re.S),
]
for p in patterns: processed_html = p.sub('', processed_html)
return processed_html
@app.route('/', defaults={'path': ''}, methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH', 'OPTIONS', 'HEAD'])
@app.route('/<path:path>', methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH', 'OPTIONS', 'HEAD'])
def proxy_request_handler(path):
# --- START: بخش اضافه شده برای بررسی Referer ---
allowed_referer_domains = ["aisada.ir", "www.aisada.ir"]
# اگر درخواست از نوع OPTIONS (معمولا برای CORS preflight) است، اجازه عبور می‌دهیم.
# این درخواست‌ها نباید Referer داشته باشند و مسدود کردن آنها مشکل ایجاد می‌کند.
if request.method == 'OPTIONS':
pass # اجازه می‌دهیم درخواست به بخش اصلی کد پراکسی برسد
# فقط درخواست‌های GET که به نظر می‌رسد برای بارگذاری یک صفحه HTML هستند، Referer را بررسی می‌کنیم.
# مسیر خالی (روت)، یا مسیری که به / ختم می‌شود، یا مسیری که به .html یا .htm ختم می‌شود.
elif request.method == 'GET' and \
(path == '' or \
path.endswith('/') or \
path.endswith('.html') or \
path.endswith('.htm')):
referer_header = request.headers.get("Referer")
is_allowed = False
if referer_header:
try:
parsed_referer = urlparse(referer_header)
referer_host = parsed_referer.hostname
if referer_host and referer_host.lower() in allowed_referer_domains:
is_allowed = True
except ValueError:
# اگر Referer قابل تجزیه نباشد، is_allowed همچنان False خواهد بود
pass
if not is_allowed:
# با توجه به تنظیمات لاگ شما، این پیام‌ها در کنسول دیده نخواهند شد مگر اینکه سطح لاگ را تغییر دهید.
# app.logger.warning(f"Access Denied (403) for path '{path}'. Referer: '{referer_header}'. IP: {request.remote_addr}")
return "Error This app is broken. .", 403
# برای سایر انواع درخواست (POST, PUT, ...) یا درخواست‌های GET که به نظر نمی‌رسد
# صفحه HTML باشند (مانند CSS, JS, تصاویر)، فعلا از بررسی Referer صرف‌نظر می‌کنیم.
# فرض بر این است که اگر صفحه اصلی با Referer معتبر بارگذاری شده، درخواست‌های داخلی نیز مجاز هستند.
# --- END: بخش اضافه شده برای بررسی Referer ---
if not TARGET_URL_FROM_SECRET:
return "Proxy Configuration Error: Target URL secret is not configured.", 500
base_target_url = TARGET_URL_FROM_SECRET.rstrip('/')
target_full_url = urljoin(base_target_url + "/", path)
if request.query_string: target_full_url += "?" + request.query_string.decode()
try:
parsed_target_url_for_host = urlparse(TARGET_URL_FROM_SECRET)
target_hostname = parsed_target_url_for_host.hostname
user_agent_to_send = random.choice(COMMON_USER_AGENTS)
excluded_incoming_headers = [
'host', 'cookie', 'connection', 'upgrade-insecure-requests',
'if-none-match', 'if-modified-since', 'referer', 'x-hf-space-host',
'x-forwarded-for', 'x-forwarded-proto', 'x-forwarded-host', 'accept-encoding'
]
forward_headers = {key: value for key, value in request.headers.items() if key.lower() not in excluded_incoming_headers}
forward_headers['User-Agent'] = user_agent_to_send
forward_headers['Host'] = target_hostname
if request.environ.get('HTTP_X_FORWARDED_FOR'): forward_headers['X-Forwarded-For'] = request.environ.get('HTTP_X_FORWARDED_FOR') + ', ' + request.remote_addr
else: forward_headers['X-Forwarded-For'] = request.remote_addr
forward_headers['X-Forwarded-Proto'] = request.scheme
forward_headers['X-Forwarded-Host'] = request.host
request_body = request.get_data() if request.method not in ['GET', 'HEAD', 'OPTIONS'] else None
with requests.Session() as s:
target_response = s.request(
method=request.method, url=target_full_url, headers=forward_headers,
data=request_body, stream=True, timeout=REQUEST_TIMEOUT, allow_redirects=False
)
if 300 <= target_response.status_code < 400 and 'Location' in target_response.headers:
location_header_val = target_response.headers['Location']
parsed_location = urlparse(location_header_val)
rewritten_location = location_header_val
if parsed_location.scheme and parsed_location.netloc == target_hostname:
rewritten_location = parsed_location.path
if parsed_location.query: rewritten_location += "?" + parsed_location.query
if not rewritten_location.startswith('/') and rewritten_location and not urlparse(rewritten_location).scheme:
rewritten_location = '/' + rewritten_location
final_redirect_headers = {'Location': rewritten_location if rewritten_location else "/"}
for k, v in target_response.headers.items():
if k.lower() in ['set-cookie', 'cache-control', 'expires', 'pragma', 'vary']: final_redirect_headers[k] = v
return Response(response=None, status=target_response.status_code, headers=final_redirect_headers)
if target_response.status_code >= 400:
error_headers = {k:v for k,v in target_response.headers.items() if k.lower() not in ['content-encoding', 'transfer-encoding', 'content-length', 'link']}
return Response(target_response.content, status=target_response.status_code, headers=error_headers)
content_type = target_response.headers.get('Content-Type', 'application/octet-stream').lower()
excluded_resp_headers = [
'content-encoding', 'transfer-encoding', 'connection', 'keep-alive',
'x-frame-options', 'strict-transport-security', 'public-key-pins',
'content-length', 'server', 'x-powered-by', 'date', 'alt-svc', 'link'
]
final_response_headers = {k: v for k, v in target_response.headers.items() if k.lower() not in excluded_resp_headers}
final_response_headers['Content-Type'] = content_type
final_response_headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, max-age=0'
final_response_headers['Pragma'] = 'no-cache'
final_response_headers['Expires'] = '0'
final_response_headers['Referrer-Policy'] = 'strict-origin-when-cross-origin'
def generate_response_content_stream_inner():
is_html = 'text/html' in content_type
css_injected = False
js_injected = False
html_buffer = b''
observer_script = ""
if is_html and SIMPLE_TEXT_REPLACEMENTS:
replacements_json_str = json.dumps(SIMPLE_TEXT_REPLACEMENTS)
js_code_template = """<script id="grProxyTextReplacerScript">(function(){{if(document.getElementById('grProxyTextReplacerScriptLoaded'))return;const el=document.createElement('div');el.id='grProxyTextReplacerScriptLoaded';el.style.display='none';if(document.body)document.body.appendChild(el);else if(document.head)document.head.appendChild(el);const r={json_payload};function esc(s){{return s.replace(/[.*+?^${{}}()|[\]\\\\]/g,'\\\\$&');}}function apply(n){{if(!n)return;if(n.nodeType===Node.ELEMENT_NODE)for(let i=0;i<n.childNodes.length;i++)apply(n.childNodes[i]);else if(n.nodeType===Node.TEXT_NODE){{if(!n.nodeValue||n.nodeValue.trim()==='')return;let t=n.nodeValue;let c=false;for(const k in r){{const v=r[k];const rgx=new RegExp(esc(k),'gi');if(rgx.test(t)){{if(v===""&&n.parentElement&&n.parentElement.tagName!=='BODY'&&n.parentElement.tagName!=='HTML'){{if(n.parentElement)n.parentElement.classList.add('gr-proxy-item-hidden-by-text');t=t.replace(rgx,'');c=true;}}else if(v!==""){t=t.replace(rgx,v);c=true;}}}}if(c)n.nodeValue=t;}}}}function proc(m,o){{apply(document.body);}}const cfg={{childList:true,subtree:true,characterData:true}};const obs=new MutationObserver(proc);function init(){{if(document.body){{apply(document.body);obs.observe(document.body,cfg);}}else setTimeout(init,50);}}if(document.readyState==='loading')document.addEventListener('DOMContentLoaded',init);else init();}})();</script>"""
observer_script = js_code_template.replace("{json_payload}", replacements_json_str)
char_enc_match = None
if is_html:
char_enc_match = re.search(r'charset=([\w-]+)', content_type, re.I)
proc_enc = char_enc_match.group(1) if char_enc_match else 'utf-8'
try:
for chunk in target_response.iter_content(chunk_size=8192, decode_unicode=False):
if not chunk: continue
if is_html:
html_buffer += chunk
if len(html_buffer) < 4096 and not (b"</head>" in html_buffer or b"</body>" in html_buffer or b"</html>" in html_buffer):
continue
try: str_buffer = html_buffer.decode(proc_enc, 'replace')
except UnicodeDecodeError: str_buffer = html_buffer.decode('latin-1', 'replace')
processed_str = str_buffer
if SIMPLE_TEXT_REPLACEMENTS:
processed_str = replace_text_server_side(processed_str, SIMPLE_TEXT_REPLACEMENTS)
if not css_injected:
head_m = re.search(r'(<head\b[^>]*>)', processed_str, re.I)
if head_m:
processed_str = processed_str[:head_m.end(1)] + GRADIO_HIDE_CSS_STANDARD + processed_str[head_m.end(1):]
css_injected = True
processed_str = process_html_content_server_side_minimal(processed_str)
if observer_script and not js_injected:
body_end_m = re.search(r'(</body>)', processed_str, re.I)
if body_end_m:
processed_str = processed_str[:body_end_m.start(1)] + observer_script + processed_str[body_end_m.start(1):]
js_injected = True
yield processed_str.encode(proc_enc, 'replace')
html_buffer = b''
else:
yield chunk
if html_buffer and is_html:
try: str_buffer = html_buffer.decode(proc_enc, 'replace')
except UnicodeDecodeError: str_buffer = html_buffer.decode('latin-1', 'replace')
final_proc_str = str_buffer
if SIMPLE_TEXT_REPLACEMENTS:
final_proc_str = replace_text_server_side(final_proc_str, SIMPLE_TEXT_REPLACEMENTS)
if not css_injected:
head_m = re.search(r'(<head\b[^>]*>)', final_proc_str, re.I)
payload = GRADIO_HIDE_CSS_STANDARD
if head_m: final_proc_str = final_proc_str[:head_m.end(1)] + payload + final_proc_str[head_m.end(1):]
else: final_proc_str = payload + final_proc_str
final_proc_str = process_html_content_server_side_minimal(final_proc_str)
if observer_script and not js_injected and not re.search(r'<script id="grProxyTextReplacerScript">', final_proc_str, re.I):
body_end_m = re.search(r'(</body>)', final_proc_str, re.I)
if body_end_m: final_proc_str = final_proc_str[:body_end_m.start(1)] + observer_script + final_proc_str[body_end_m.start(1):]
else: final_proc_str += observer_script
yield final_proc_str.encode(proc_enc, 'replace')
elif html_buffer:
yield html_buffer
except Exception as e_stream:
app.logger.error(f"Stream error for {target_full_url}: {e_stream}")
finally:
target_response.close()
if request.method == 'HEAD':
return Response(None, status=target_response.status_code, headers=final_response_headers)
else:
if 'x-frame-options' in final_response_headers: del final_response_headers['x-frame-options']
return Response(stream_with_context(generate_response_content_stream_inner()), status=target_response.status_code, headers=final_response_headers)
except requests.exceptions.Timeout:
app.logger.error(f"Timeout ({REQUEST_TIMEOUT}s) for {target_full_url}")
return "Error: Request to target timed out.", 504
except requests.exceptions.HTTPError as e_http:
error_headers = {k:v for k,v in e_http.response.headers.items() if k.lower() not in ['content-encoding', 'transfer-encoding', 'content-length', 'link']}
return Response(e_http.response.content, status=e_http.response.status_code, headers=error_headers)
except requests.exceptions.RequestException as e_req:
app.logger.error(f"RequestException for {target_full_url}: {e_req}")
return f"Error fetching content: {e_req}", 502
except Exception as e_gen:
app.logger.exception(f"Unexpected error proxying {target_full_url}")
return f"Unexpected server error: {str(e_gen)}", 500
if __name__ == '__main__':
port = int(os.environ.get("PORT", 7860))
debug_mode = os.environ.get("FLASK_DEBUG", "false").lower() == "true"
print(f"INFO: Starting Flask app on host 0.0.0.0, port {port}, debug: {debug_mode}")
if not os.environ.get("WITH_GUNICORN"):
app.run(host='0.0.0.0', port=port, debug=debug_mode)