Spaces:
Sleeping
Sleeping
File size: 8,952 Bytes
f6f067a 9ed40f1 d1dc380 f6f067a cd3e819 f6f067a cd3e819 f6f067a cd3e819 f6f067a cd3e819 f6f067a cd3e819 f6f067a cd3e819 f6f067a 3449d98 cd3e819 f6f067a cd3e819 f6f067a 3449d98 11b79d3 cd3e819 11b79d3 cd3e819 11b79d3 cd3e819 11b79d3 cd3e819 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 |
import gradio as gr
import requests
import os
import io
from PIL import Image
import random
from datetime import datetime, timedelta
import hashlib
import threading
import tempfile
from fastapi import FastAPI, HTTPException
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
# Путь к временному кэшу
CACHE_DIR = os.path.join(tempfile.gettempdir(), "image_cache")
if not os.path.exists(CACHE_DIR):
os.makedirs(CACHE_DIR)
# Время жизни кэша в минутах
CACHE_DURATION = 45
# Функция для очистки кэша
def clear_cache():
current_time = datetime.now()
for filename in os.listdir(CACHE_DIR):
file_path = os.path.join(CACHE_DIR, filename)
if os.path.isfile(file_path):
file_time = datetime.fromtimestamp(os.path.getmtime(file_path))
if current_time - file_time > timedelta(minutes=CACHE_DURATION):
os.remove(file_path)
# Запуск очистки кэша каждые 45 минут
def start_cache_cleanup():
while True:
clear_cache()
threading.Event().wait(CACHE_DURATION * 60)
# Запуск потока для очистки кэша
cleanup_thread = threading.Thread(target=start_cache_cleanup, daemon=True)
cleanup_thread.start()
# Функция для сохранения изображения в кэш и возврата пути к нему
def save_image_to_cache(image):
image_hash = hashlib.md5(image.tobytes()).hexdigest()
image_path = os.path.join(CACHE_DIR, f"{image_hash}.jpg")
if not os.path.exists(image_path):
image.save(image_path, format="JPEG")
return image_path
# Функция для сжатия изображения
def compress_image(image, max_size=(800, 800)):
image.thumbnail(max_size, Image.LANCZOS)
return image
# Функция для получения прямой ссылки на изображение в кэше
def get_image_url(image_path, server_url):
return f"{server_url}/image/{os.path.basename(image_path)}"
# Функция для отправки запроса в OpenAI с изображением и получения ответа
def ask_openai_with_image(messages, instruction, image, server_url):
if not instruction and image is None:
emj = random.choice(emojis)
raise gr.Error(f"{emj} Заполните, пожалуйста, хотя бы одно поле")
new_message = {
"role": "user",
"content": [
{
"type": "text",
"text": instruction if instruction else "",
}
]
}
if image is not None:
# Сжимаем изображение
compressed_image = compress_image(image)
# Сохраняем изображение в кэш
image_path = save_image_to_cache(compressed_image)
# Получаем прямую ссылку на изображение
image_url = get_image_url(image_path, server_url)
new_message["content"].append({
"type": "image_url",
"image_url": {
"url": image_url,
"detail": "high",
},
})
messages.append(new_message)
payload = {
"model": "learnlm-1.5-pro-experimental",
"messages": messages,
"max_tokens": 4095,
}
# Заголовки для запроса
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {api_key}'
}
# URL для запроса к API OpenAI
url = BASE_URL
# Отправляем запрос в OpenAI с таймаутом 225 секунд
try:
response = requests.post(url, headers=headers, json=payload, timeout=225)
except requests.Timeout:
messages.append({
"role": "assistant",
"content": "Ошибка: Запрос к ИИ превысил максимальное время ожидания. Повторите попытку позже."
})
return messages, "Ошибка: Запрос к ИИ превысил максимальное время ожидания. Повторите попытку позже."
except requests.RequestException as e:
messages.append({
"role": "assistant",
"content": f"Ошибка при отправке запроса, повторите попытку позже."
})
return messages, f"Ошибка при отправке запроса, повторите попытку позже."
# Проверяем ответ и возвращаем результат
if response.status_code == 200:
response_json = response.json()
try:
# Пытаемся извлечь текст из ответа
otvet = response_json["choices"][0]["message"]["content"]
messages.append({
"role": "assistant",
"content": otvet
})
return messages, messages
except Exception as e:
# Если есть ошибка в структуре JSON, выводим ее
messages.append({
"role": "assistant",
"content": f"Ошибка обработки ответа: {e}"
})
return messages, f"Ошибка обработки ответа: {e}"
else:
# Если произошла ошибка, возвращаем сообщение об ошибке
messages.append({
"role": "assistant",
"content": f"Ошибка: {response.status_code} - {response.text}"
})
return messages, f"Ошибка: {response.status_code} - {response.text}"
emojis = ['😊', '🤗', '🥺', '😅', '🤭', '😔', '✨', '😜', '🙏']
api_key = os.getenv("OPENAI_API_KEY")
BASE_URL = os.getenv("BASE_URL")
SERVER_URL = os.getenv("SERVER_URL", "http://localhost:7860") # Убедитесь, что этот URL правильный
# Текст начального сообщения
start = "Приветствую тебя! 🌟 Ты - \"Помогатор 1.5\". Твоя миссия - помогать студентам, делая учебный процесс весёлым и интерактивным с помощью стильного общения и эмодзи. 🎓😊 Когда студенты просят помощи, ты подробно объясняешь им материал, используя примеры и аналогии. Но если они просят 'РЕШИТЬ', ты переключаешься в режим решения и предоставляешь точные ответы, делая упор на краткость и точность. 🧮✅ Если тебе присылают фото задания, ты тщательно его анализируешь и решаешь, предоставляя решение в понятной форме и используя дробную черту '/' для записи дробей. 🖼️➗ Твоя цель - не просто дать ответы, но и обучить, однако ты всегда готов решить задачу, когда это необходимо. Приступим? 🚀🌈"
# Начальные сообщения
initial_messages = [
{
"role": "system",
"content": start,
}
]
# Создаем интерфейс с помощью Gradio
with gr.Blocks() as demo:
with gr.Row():
with gr.Column():
chatbot = gr.Chatbot(label="История Сообщений", type='messages')
with gr.Row():
image_input = gr.Image(label="Фото", type="pil")
instructions = gr.Textbox(label="Сообщение", lines=3, placeholder="Реши...")
submit_button = gr.Button("Отправить")
submit_button.click(
fn=ask_openai_with_image,
inputs=[gr.State(initial_messages), instructions, image_input, gr.Textbox(SERVER_URL, visible=False)],
outputs=[gr.State(initial_messages), chatbot],
concurrency_limit=150,
show_progress=True
)
# Монтируем маршрут для получения изображений из кэша
app = demo.app
app.mount("/image", StaticFiles(directory=CACHE_DIR), name="image_cache")
# Обработка маршрута для получения изображений из кэша
@app.get("/image/{image_name:path}")
async def get_image(image_name: str):
image_path = os.path.join(CACHE_DIR, image_name)
if os.path.isfile(image_path):
return FileResponse(image_path)
else:
raise HTTPException(status_code=404, detail="Image not found")
demo.launch()
|