Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import requests | |
| import os | |
| import uuid | |
| # === CONFIG === | |
| API_URL = os.getenv("API_URL", "http://localhost:8000") | |
| DEFAULT_HEADERS = {} | |
| st.set_page_config(page_title="π§ LLaMA Local AI Agent", layout="wide") | |
| # === SESSION STATE INIT === | |
| if "token" not in st.session_state: | |
| st.session_state.token = "" | |
| if "session_id" not in st.session_state: | |
| st.session_state.session_id = str(uuid.uuid4()) | |
| # === AUTH HELPERS === | |
| def auth_headers(): | |
| headers = DEFAULT_HEADERS.copy() | |
| if st.session_state.token: | |
| headers["Authorization"] = f"Bearer {st.session_state.token}" | |
| return headers | |
| def post(endpoint, data=None, files=None, json=None): | |
| try: | |
| res = requests.post(f"{API_URL}{endpoint}", data=data, files=files, json=json, headers=auth_headers()) | |
| res.raise_for_status() | |
| return res.json() | |
| except Exception as e: | |
| st.error(f"β API POST Error: {e}") | |
| return None | |
| def get(endpoint, params=None): | |
| try: | |
| res = requests.get(f"{API_URL}{endpoint}", params=params, headers=auth_headers()) | |
| res.raise_for_status() | |
| return res.json() | |
| except Exception as e: | |
| st.error(f"β API GET Error: {e}") | |
| return None | |
| # === SIDEBAR AUTH === | |
| st.sidebar.title("π Authentication") | |
| # === LOGIN === | |
| st.sidebar.subheader("Login") | |
| username = st.sidebar.text_input("Username", key="login_username") | |
| password = st.sidebar.text_input("Password", type="password", key="login_password") | |
| if st.sidebar.button("Login"): | |
| login_response = post("/login", data={"username": username, "password": password}) | |
| if login_response and "access_token" in login_response: | |
| st.session_state.token = login_response["access_token"] | |
| st.success("β Logged in!") | |
| st.rerun() | |
| else: | |
| st.error("β Login failed. Check credentials.") | |
| # === REGISTER === | |
| st.sidebar.markdown("---") | |
| st.sidebar.subheader("π New here? Register") | |
| new_username = st.sidebar.text_input("New Username", key="reg_username") | |
| new_password = st.sidebar.text_input("New Password", type="password", key="reg_password") | |
| confirm_password = st.sidebar.text_input("Confirm Password", type="password", key="reg_confirm") | |
| role = st.sidebar.selectbox("Role", ["user", "admin"], key="reg_role") | |
| if st.sidebar.button("Register"): | |
| if new_password != confirm_password: | |
| st.sidebar.error("β Passwords do not match.") | |
| elif not new_username or not new_password: | |
| st.sidebar.warning("Please fill all fields.") | |
| else: | |
| reg_payload = { | |
| "username": new_username, | |
| "password": new_password, | |
| "role": role | |
| } | |
| reg_result = post("/register", json=reg_payload) | |
| if reg_result and "access_token" in reg_result: | |
| st.session_state.token = reg_result["access_token"] | |
| st.sidebar.success("β Registered & Logged in!") | |
| st.rerun() | |
| else: | |
| st.sidebar.error("β Registration failed (User may exist).") | |
| # === IF LOGGED IN === | |
| if st.session_state.token: | |
| tabs = st.tabs(["π¬ Chat", "π File Q&A", "π§ Tools", "πΌοΈ Image AI", "π€ Voice", "π€ Agent", "π§ Email"]) | |
| # === π¬ Chat Tab === | |
| with tabs[0]: | |
| st.header("π¬ Chat with AI") | |
| user_input = st.text_input("Ask something...") | |
| if st.button("Send", key="chat"): | |
| if user_input: | |
| result = post("/chat", json={ | |
| "session_id": st.session_state.session_id, | |
| "user_message": user_input | |
| }) | |
| if result: | |
| st.markdown(f"**AI:** {result.get('bot_response')}") | |
| # === π File Upload and QA Tab === | |
| with tabs[1]: | |
| st.header("π Upload & Ask") | |
| file = st.file_uploader("Upload PDF, TXT or image", type=["pdf", "txt", "jpg", "jpeg", "png"]) | |
| if file: | |
| st.success(f"β Uploaded: {file.name}") | |
| if file.type.startswith("image/"): | |
| st.image(file) | |
| with st.spinner("Processing..."): | |
| caption = post("/image-caption", files={"file": file}) | |
| ocr = post("/ocr", files={"file": file}) | |
| if caption: | |
| st.info(f"πΌοΈ Caption: {caption.get('caption')}") | |
| if ocr: | |
| st.info(f"π OCR: {ocr.get('text')}") | |
| else: | |
| upload_res = post("/upload", files={"file": file}) | |
| if upload_res: | |
| question = st.text_input("Ask a question about the file") | |
| if st.button("Ask"): | |
| answer = get("/query_file", params={"filename": file.name, "question": question}) | |
| if answer: | |
| st.success(f"π‘ Answer: {answer.get('answer')}") | |
| # === π§ Tools Tab === | |
| with tabs[2]: | |
| st.header("π οΈ AI Tools") | |
| tool_input = st.text_input("Enter input") | |
| tool = st.selectbox("Select Tool", ["calc", "time", "joke", "search", "browse", "exec_python"]) | |
| if st.button("Run Tool"): | |
| result = post(f"/tool/{tool}", json={"prompt": tool_input}) | |
| if result: | |
| st.markdown(f"π§ Result: {result.get('result')}") | |
| # === πΌοΈ Image AI Tab === | |
| with tabs[3]: | |
| st.header("πΌοΈ Caption Any Image") | |
| img_file = st.file_uploader("Upload an image", type=["jpg", "jpeg", "png"]) | |
| if img_file: | |
| st.image(img_file, width=400) | |
| if st.button("Generate Caption"): | |
| caption = post("/caption_image", files={"file": img_file}) | |
| if caption: | |
| st.success(f"π Caption: {caption.get('caption')}") | |
| # === π€ Voice Tab === | |
| with tabs[4]: | |
| st.header("π€ Voice Chat with AI") | |
| voice_file = st.file_uploader("Upload MP3 or WAV", type=["mp3", "wav"]) | |
| if voice_file: | |
| st.audio(voice_file) | |
| transcribed = post("/transcribe", files={"file": voice_file}) | |
| if transcribed: | |
| st.success(f"π Transcription: {transcribed.get('transcription')}") | |
| if st.button("Respond"): | |
| response = post("/chat", json={"message": transcribed["transcription"]}) | |
| if response: | |
| reply = response.get("response") | |
| st.success(f"π£οΈ AI says: {reply}") | |
| tts = requests.post(f"{API_URL}/speak", params={"text": reply}, headers=auth_headers()) | |
| if tts.status_code == 200: | |
| st.audio(tts.content) | |
| else: | |
| st.error("β TTS failed.") | |
| # === π€ Agent Tab === | |
| with tabs[5]: | |
| st.header("π€ AI Agent") | |
| task = st.text_input("Enter an agent task") | |
| if st.button("Run Agent"): | |
| result = post("/agent", json={"prompt": task}) | |
| if result: | |
| st.markdown(f"π οΈ Result: {result.get('result')}") | |
| st.divider() | |
| if st.button("π Export Chat History"): | |
| history = get("/history/export") | |
| if history: | |
| st.text_area("History", history.get("text", ""), height=300) | |
| # === π§ Email Generator Tab === | |
| with tabs[6]: | |
| st.header("π§ Promo Email Builder") | |
| product = st.text_input("Product") | |
| recipient = st.text_input("Recipient Email") | |
| discount = st.slider("Discount (%)", 5, 80, 15) | |
| if st.button("Generate Email"): | |
| email = get("/generate_email", params={"to": recipient, "product": product, "discount": discount}) | |
| if email: | |
| st.code(email.get("email")) | |
| else: | |
| st.warning("π Please login or register using the sidebar to access the AI features.") | |