Spaces:
Paused
Paused
| from flask import Flask, request, jsonify, send_from_directory | |
| from flask_limiter import Limiter | |
| from flask_limiter.util import get_remote_address | |
| import google.generativeai as genai | |
| from PIL import Image | |
| from io import BytesIO | |
| from prodiapy import Prodia | |
| import requests | |
| import os | |
| import psutil | |
| import time | |
| import datetime | |
| import json | |
| import subprocess | |
| import string | |
| import random | |
| from g4f.client import Client | |
| import tempfile | |
| from huggingface_hub import HfApi, login | |
| import threading | |
| client = Client() | |
| app = Flask(__name__) | |
| HF_TOKEN = os.environ.get('HF_TOKEN') | |
| login(token=HF_TOKEN) | |
| api = HfApi() | |
| limiter = Limiter( | |
| app, | |
| default_limits=["30 per minute"] | |
| ) | |
| # Define the path to static files | |
| static_dir = os.path.join(os.path.dirname(os.path.abspath(__file__))) | |
| def css(filename): | |
| return send_from_directory(os.path.join(static_dir, 'css'), filename) | |
| def js(filename): | |
| return send_from_directory(os.path.join(static_dir, 'js'), filename) | |
| def img(filename): | |
| return send_from_directory(os.path.join(static_dir, 'img'), filename) | |
| def index(): | |
| return send_from_directory(static_dir, 'index.html') | |
| def ai(): | |
| return send_from_directory(os.path.join(static_dir, 'views'), 'ai.html') | |
| def ai_file(filename): | |
| if filename.endswith('.py'): | |
| with open(os.path.join(static_dir, 'ai', filename), 'r') as f: | |
| code = f.read() | |
| output = subprocess.check_output(["python", "-c", code], shell=True, stderr=subprocess.STDOUT) | |
| return output.decode('utf-8') | |
| else: | |
| return send_from_directory(os.path.join(static_dir, 'ai'), filename) | |
| def info(): | |
| ip = request.remote_addr | |
| current_time = datetime.datetime.now().strftime("%H:%M:%S") | |
| return jsonify({'ip': ip, 'current_time': current_time}) | |
| # Define the visitor count routes | |
| visitor_count = 0 | |
| visitor_today = 0 | |
| last_update_date = datetime.datetime.now().date() | |
| visitor_total = 0 | |
| def update_visitor_counts(): | |
| global visitor_count, visitor_today, last_update_date, visitor_total | |
| allowed_paths = ['/ai', '/api', '/tool'] | |
| if request.path.startswith(tuple(allowed_paths)): | |
| current_date = datetime.datetime.now().date() | |
| if current_date != last_update_date: | |
| visitor_today = 0 | |
| last_update_date = current_date | |
| visitor_count += 1 | |
| visitor_today += 1 | |
| visitor_total += 1 | |
| if datetime.datetime.now().hour == 0 and datetime.datetime.now().minute == 0: | |
| reset_visitor_count() | |
| def count(): | |
| return jsonify({ | |
| 'visitor_count': visitor_count, | |
| 'visitor_today': visitor_today, | |
| 'visitor_total': visitor_total | |
| }) | |
| # Define the status route | |
| def status(): | |
| uptime_seconds = int(time.time() - psutil.boot_time()) | |
| uptime = str(datetime.timedelta(seconds=uptime_seconds)) | |
| memory_free = psutil.virtual_memory().available | |
| memory_total = psutil.virtual_memory().total | |
| return jsonify({'runtime': uptime, 'memory': f'{memory_free} / {memory_total}'}) | |
| # Handle 404 errors | |
| def page_not_found(e): | |
| return send_from_directory(static_dir, '404.html'), 404 | |
| apiKeys = [ | |
| "f5282cab-1ced-4b6e-80f4-11b2be59af01", | |
| "2021e94a-1385-4ddc-905b-c050cfb5af32", | |
| "0bfe0e6d-6bf9-4984-ab07-3a9410a551ad", | |
| "1452e7a5-d6e2-4600-9641-1c2debde397a", | |
| "f4b18c3c-ea4d-4b18-be47-f5ad29d70936", | |
| "688659c2-b2e9-4524-8a91-1c72735ec068", | |
| "aa64f14e-18d8-44df-91cc-6d4e20051ca3", | |
| "7440ab53-6c97-40bc-aad4-50a93e753256", | |
| "65f9a7e7-bcf5-4f21-8715-64cdbc3adbdf", | |
| "cc9e0170-ffc8-43e0-b01c-eb4847158a72", | |
| "ee39894a-3f05-4fca-8d6b-2eaf6443c2d5", | |
| "ef9d2480-a655-490e-983a-2a448ead257c", | |
| "a888afb5-2e90-4fc0-bb1a-617ba4a24c2f", | |
| "201a4f06-ac0d-4877-a8d6-c346d7dc1c9f", | |
| "9f604055-793a-4a2f-a528-c7fe283f0fa9" | |
| ] | |
| # Load styles from style.json file | |
| with open("style.json", "r") as style_file: | |
| styleList = json.load(style_file) | |
| def getRandomApiKey(): | |
| # Implement your logic to get a random API key here | |
| return random.choice(apiKeys) | |
| def getRandomSeed(): | |
| return random.randint(1, 18446744073709552000) | |
| def getAvailableStyles(): | |
| return ', '.join([style["name"] for style in styleList]) | |
| prodia = Prodia(getRandomApiKey()) | |
| def get_styles(): | |
| with open("style.json", "r") as style_file: | |
| styles = json.load(style_file) | |
| return jsonify({'status': 'success', "styles": [style["name"] for style in styles]}) | |
| def upload_image(): | |
| try: | |
| # Get the URL parameter | |
| url = request.args.get('url') | |
| if not url: | |
| return jsonify({'error': 'URL parameter is missing'}), 400 | |
| # Download the image | |
| response = requests.get(url) | |
| if response.status_code != 200: | |
| return jsonify({'error': 'Failed to download image'}), 400 | |
| image_name = f"image.png" | |
| # Save the image | |
| img = Image.open(BytesIO(response.content)) | |
| img.save(image_name, "PNG") | |
| # Send the image to Discord | |
| discord_webhook_url = "https://discord.com/api/webhooks/1217109788656406588/sh0LG9VH5wmxSWP8OBwfHxfbbMHleUX6eQ8-xULIEo5m4IASfNm7jCNrZFZZweKaNGTM" | |
| files = {'file': open(image_name, 'rb')} | |
| webhook_response = requests.post(discord_webhook_url, files=files) | |
| # Get the uploaded image URL from Discord CDN | |
| discord_cdn_url = webhook_response.json().get('attachments', [{}])[0].get('url') | |
| # Delete the temporary image file | |
| os.remove(image_name) | |
| return jsonify({ | |
| 'success': f'Image uploaded and sent to Discord', | |
| 'discord_cdn_url': discord_cdn_url | |
| }), 200 | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| async def generate_image(): | |
| try: | |
| data = request.json | |
| prompt = data.get('prompt', '') | |
| userStyle = data.get('style') | |
| seed = int(data.get('seed', getRandomSeed())) | |
| guidance_scale = int(data.get('guidance_scale', 0)) | |
| if not userStyle: | |
| return jsonify({"status": "error", "error": "Style is required. Available styles: " + getAvailableStyles()}), 400 | |
| selectedStyle = next((style for style in styleList if style["name"].lower() == userStyle.lower()), None) | |
| if not selectedStyle: | |
| return jsonify({"status": "error", "error": "Invalid style. Available styles: " + getAvailableStyles()}), 400 | |
| if guidance_scale and (guidance_scale < 1 or guidance_scale > 100): | |
| return jsonify({"status": "error", "error": "guidance_scale must be an integer between 1 and 100."}), 400 | |
| job = prodia.sdxl.generate( | |
| prompt=selectedStyle["prompt"].replace('{prompt}', data.get('prompt', '')), | |
| model="sd_xl_base_1.0.safetensors [be9edd61]", | |
| negative_prompt=selectedStyle["negative_prompt"] + ", duplicate", | |
| sampler="DPM++ 2M Karras", | |
| cfg_scale=selectedStyle.get('cfg_scale', 7), | |
| steps=selectedStyle.get('steps', 20), | |
| height=1024, | |
| width=1024) | |
| wait = prodia.wait(job) | |
| url = wait.image_url | |
| # Discord Bot setup | |
| discord_webhook_url = "https://discord.com/api/webhooks/1217084642675654717/FpiXr5sLPmFNZ0xDz5HNClwn6NCYNmL2JvwdcGwb7V9FZd9bdPfSPZR41HmGzGD3uR8d" | |
| # Send the generated image URL through the webhook | |
| image_name = f"invite_1080035826051854356_best_bot_ever.png" | |
| img = Image.open(BytesIO(requests.get(url).content)) | |
| img.save(image_name, "PNG") | |
| files = {'file': open(image_name, 'rb')} | |
| webhook_response = requests.post(discord_webhook_url, files=files) | |
| # Print the response for debugging | |
| print(webhook_response.text) | |
| # Check if the request was successful | |
| if webhook_response.status_code == 200: | |
| discord_cdn_url = webhook_response.json().get('attachments', [{}])[0].get('url') | |
| # Remove the temporary image file | |
| os.remove(image_name) | |
| # Return the success response with the generated image URL | |
| return jsonify({ | |
| 'status': 'success', | |
| 'url': discord_cdn_url | |
| }), 200 | |
| else: | |
| # If the request to the webhook fails, return an error response | |
| return jsonify({"status": "error", "error": "Failed to send image through webhook"}), 500 | |
| except Exception as e: | |
| print('Error:', str(e)) | |
| return jsonify({"status": "error", "error": "Internal Server Error"}), 500 | |
| genai.configure(api_key="AIzaSyBPIdkEyVTDZnmXrBi4ykf0sOfkbOvxAzo") | |
| DEFAULT_MODELS = ["gemini-1.0-pro", "gemini-1.0-pro-001"] | |
| def gemini(): | |
| data = request.json | |
| prompt = data.get('prompt') | |
| model_name = data.get('model') | |
| messages = data.get('messages', []) | |
| if not prompt: | |
| return jsonify({'error': 'Prompt parameter is required'}), 400 | |
| if model_name and model_name not in DEFAULT_MODELS: | |
| return jsonify({'error': f'Model {model_name} not found'}), 400 | |
| try: | |
| # Use the specified model or default model | |
| selected_model = model_name if model_name in DEFAULT_MODELS else DEFAULT_MODELS[0] | |
| # Set up the selected model | |
| generation_config = { | |
| "temperature": 1, | |
| "top_p": 1, | |
| "top_k": 1, | |
| "max_output_tokens": 2048, | |
| } | |
| safety_settings = [ | |
| {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"}, | |
| {"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_MEDIUM_AND_ABOVE"}, | |
| {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"}, | |
| {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"}, | |
| ] | |
| model = genai.GenerativeModel( | |
| model_name=selected_model, | |
| generation_config=generation_config, | |
| safety_settings=safety_settings | |
| ) | |
| # Start the conversation and generate response | |
| convo = model.start_chat(history=messages) | |
| convo.send_message(prompt) | |
| response = convo.last.text | |
| return jsonify({'status': 'success', 'response': response}) | |
| except Exception as e: | |
| error_message = str(e) | |
| app.logger.error("Failed to generate content: %s", error_message) | |
| return jsonify({'status': 'error', 'error': 'Failed to generate content.'}), 500 | |
| TOKEN_MESSAGES_TMP = {} | |
| def generate_chat_id(): | |
| """Generate a random chat ID.""" | |
| while True: | |
| chat_id = "Kastg_" + ''.join(random.choices(string.ascii_letters + string.digits, k=random.randint(15, 30))) | |
| if len(chat_id) > 7: | |
| return chat_id | |
| def chat_tmp(): | |
| data = request.json | |
| messages = data.get('messages') | |
| chat_id = data.get('chat-id') | |
| if not messages: | |
| return jsonify({'status': 'error', 'error': 'Messages parameter is required'}), 400 | |
| if not chat_id: | |
| # Generate a new chat ID | |
| chat_id = generate_chat_id() | |
| elif not chat_id.startswith('Kastg_'): | |
| return jsonify({'status': 'error', 'error': 'Chat ID must start with "Kastg_"'}), 400 | |
| elif len(chat_id) <= 13: | |
| return jsonify({'status': 'error', 'error': 'Chat ID must have more than 7 characters after Kastg_'}), 400 | |
| # Save messages under chat ID | |
| if chat_id not in TOKEN_MESSAGES_TMP: | |
| TOKEN_MESSAGES_TMP[chat_id] = [] | |
| for message in messages: | |
| TOKEN_MESSAGES_TMP[chat_id].append(message) | |
| # Return token and saved messages | |
| response_data = {'creator': 'api.Kastg.com', 'status': 'success', 'chat-id': chat_id, 'messages': TOKEN_MESSAGES_TMP[chat_id]} | |
| return jsonify(response_data) | |
| def handle_message(): | |
| try: | |
| # Get the data from the request JSON | |
| data = request.json | |
| messages = data.get('messages', []) | |
| model = data.get('model', 'gpt-4o-mini') | |
| if not messages: | |
| return jsonify({"error": "No messages provided"}), 400 | |
| # Validate the structure of messages | |
| for message in messages: | |
| if 'role' not in message or 'content' not in message: | |
| return jsonify({"error": "Invalid message format"}), 400 | |
| # Use the G4F client to get a response | |
| response = client.chat.completions.create( | |
| model=model, | |
| messages=messages | |
| ) | |
| # Extract the response content | |
| ai_response = response.choices[0].message.content | |
| # Return the response as JSON | |
| return jsonify({"response": ai_response}) | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| def make_text(): | |
| query = request.args.get('query') | |
| file_name = request.args.get('fileName') | |
| repo_id = request.args.get('repoId') | |
| repo_type = request.args.get('repoType', 'dataset') | |
| if not query or not file_name or not repo_id: | |
| return "Parameters 'query', 'fileName', and 'repoId' are required", 400 | |
| if repo_type not in ['space', 'dataset', 'model']: | |
| return "Invalid 'repoType'. Must be 'space', 'dataset', or 'model'", 400 | |
| # Create a temporary file | |
| with tempfile.NamedTemporaryFile(mode='w', delete=False) as temp_file: | |
| temp_file.write(query) | |
| temp_file_path = temp_file.name | |
| try: | |
| # Upload the temporary file to Hugging Face | |
| api.upload_file( | |
| path_or_fileobj=temp_file_path, | |
| path_in_repo=file_name, | |
| repo_id=repo_id, | |
| repo_type=repo_type, | |
| ) | |
| return f"File '{file_name}' uploaded successfully to {repo_id} ({repo_type})", 200 | |
| except Exception as e: | |
| return f"Error uploading file: {str(e)}", 500 | |
| finally: | |
| # Clean up the temporary file | |
| os.unlink(temp_file_path) | |
| def delete_file_after_delay(file_path, delay_seconds): | |
| def delete_file(): | |
| time.sleep(delay_seconds) | |
| try: | |
| os.unlink(file_path) | |
| print(f"Temporary file {file_path} deleted after {delay_seconds} seconds.") | |
| except Exception as e: | |
| print(f"Error deleting temporary file {file_path}: {str(e)}") | |
| thread = threading.Thread(target=delete_file) | |
| thread.start() | |
| def upload_image_dataset(): | |
| data = request.json | |
| url = data.get('url') | |
| file_name = data.get('fileName') | |
| repo_id = data.get('repoId') | |
| repo_type = data.get('repoType', 'dataset') | |
| is_forwarded = data.get('is_forwarded', False) | |
| if not url or not file_name or not repo_id: | |
| return jsonify({"error": "Parameters 'url', 'fileName', and 'repoId' are required"}), 400 | |
| if repo_type not in ['space', 'dataset', 'model']: | |
| return jsonify({"error": "Invalid 'repoType'. Must be 'space', 'dataset', or 'model'"}), 400 | |
| try: | |
| # Download the image | |
| headers = {} | |
| if is_forwarded: | |
| headers['Authorization'] = f'Bearer {HF_TOKEN}' | |
| response = requests.get(url, headers=headers) | |
| response.raise_for_status() | |
| # Create a temporary file | |
| with tempfile.NamedTemporaryFile(delete=False) as temp_file: | |
| temp_file.write(response.content) | |
| temp_file_path = temp_file.name | |
| # Schedule file deletion after 1 minute | |
| delete_file_after_delay(temp_file_path, 60) | |
| # Upload the file to Hugging Face | |
| api.upload_file( | |
| path_or_fileobj=temp_file_path, | |
| path_in_repo=file_name, | |
| repo_id=repo_id, | |
| repo_type=repo_type, | |
| ) | |
| return jsonify({ | |
| "message": f"File '{file_name}' uploaded successfully to {repo_id} ({repo_type})", | |
| "size": len(response.content) | |
| }), 200 | |
| except requests.RequestException as e: | |
| return jsonify({"error": f"Error downloading image: {str(e)}"}), 500 | |
| except Exception as e: | |
| return jsonify({"error": f"Error uploading file: {str(e)}"}), 500 | |
| if __name__ == "__main__": | |
| app.run(host="0.0.0.0", port=7860, debug=True) |