Spaces:
Running
Running
| import os | |
| import json | |
| import base64 | |
| import uuid | |
| import logging | |
| from typing import Generator, List, Dict, Tuple, Optional | |
| from sentence_transformers import SentenceTransformer | |
| from openai import OpenAI | |
| import gradio as gr | |
| from dotenv import load_dotenv | |
| from utils.utils import ( | |
| get_keys_chunks, | |
| get_docs, | |
| get_top_chunk_keys, | |
| get_messages, | |
| load_knowledge_base, | |
| ) | |
| from utils.chatLogger import ChatUploader | |
| # --------------- Logging --------------- | |
| def _setup_logging() -> logging.Logger: | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(levelname)s:%(name)s:%(message)s", | |
| ) | |
| return logging.getLogger(__name__) | |
| # --------------- Initialization --------------- | |
| def _require_env(var: str) -> str: | |
| val = os.getenv(var) | |
| if not val: | |
| raise RuntimeError(f"Missing required environment variable: {var}") | |
| return val | |
| def initialize() -> Tuple[ | |
| SentenceTransformer, | |
| List[Tuple[str, "numpy.ndarray"]], | |
| Dict, | |
| OpenAI, | |
| logging.Logger, | |
| Optional[ChatUploader], | |
| ]: | |
| logger = _setup_logging() | |
| logger.info("Initializing application...") | |
| load_dotenv(override=False) | |
| logger.info(".env loaded (override=False)") | |
| embedding_model_path = "ibm-granite/granite-embedding-125m-english" | |
| embedding_model = SentenceTransformer(embedding_model_path) | |
| logger.info("Embedding model loaded: %s", embedding_model_path) | |
| knowledge_base = load_knowledge_base() | |
| logger.info("Knowledge base loaded") | |
| pairs = list(get_keys_chunks(knowledge_base)) | |
| if not pairs: | |
| raise RuntimeError("Knowledge base is empty – no chunks to encode.") | |
| keys, chunks = zip(*pairs) | |
| logger.info("KB chunks extracted: %d", len(chunks)) | |
| chunks_encoded = embedding_model.encode( | |
| list(chunks), | |
| batch_size=64, | |
| convert_to_numpy=True, | |
| show_progress_bar=False, | |
| ) | |
| keys_chunks_encoded = list(zip(keys, chunks_encoded)) | |
| logger.info("KB chunks encoded: %d", len(keys_chunks_encoded)) | |
| inference_api_key = _require_env("INFERENCE_API_KEY") | |
| openai_client = OpenAI( | |
| base_url="https://api.inference.net/v1", | |
| api_key=inference_api_key, | |
| ) | |
| logger.info("OpenAI client initialized (base_url=api.inference.net)") | |
| chat_uploader: Optional[ChatUploader] = None | |
| drive_creds_b64 = os.getenv("GOOGLE_DRIVE_SERVICE_ACCOUNT_CREDENTIALS_BASE64") | |
| if drive_creds_b64: | |
| try: | |
| service_account_json = json.loads( | |
| base64.b64decode(drive_creds_b64).decode() | |
| ) | |
| chat_uploader = ChatUploader(service_account_json) | |
| logger.info("Google Drive uploader configured") | |
| except Exception as e: | |
| logger.warning( | |
| "Google Drive uploader not configured (error parsing creds): %s", e | |
| ) | |
| chat_uploader = None | |
| else: | |
| logger.info("Google Drive uploader not configured (no creds env var)") | |
| logger.info("Initialization complete") | |
| return ( | |
| embedding_model, | |
| keys_chunks_encoded, | |
| knowledge_base, | |
| openai_client, | |
| logger, | |
| chat_uploader, | |
| ) | |
| ( | |
| embedding_model, | |
| keys_chunksEncoded, | |
| knowledge_base, | |
| openAI_client, | |
| logger, | |
| chat_uploader, | |
| ) = initialize() | |
| # --------------- Helpers --------------- | |
| def _strip_think_tags(text: str) -> str: | |
| return text.replace("<think>", "").replace("</think>", "") | |
| def _to_minimal(history: List[Dict[str, str]]) -> List[Dict[str, str]]: | |
| """Keep only role/content keys to avoid metadata/options noise in uploads.""" | |
| minimal: List[Dict[str, str]] = [] | |
| for m in history: | |
| role = m.get("role") | |
| content = m.get("content", "") | |
| if role is None: | |
| # ignore malformed entries | |
| continue | |
| minimal.append({"role": role, "content": content}) | |
| return minimal | |
| # --------------- RAG Chatbot --------------- | |
| def rag_chatbot( | |
| user_message: str, | |
| chat_history: List[Dict[str, str]], | |
| browser_id: str, | |
| ) -> Generator[List[Dict[str, str]], None, None]: | |
| """ | |
| Stream assistant output as a single growing message dict. | |
| Do NOT mutate chat_history; Gradio manages it for type="messages". | |
| """ | |
| # RAG retrieval | |
| try: | |
| logger.info("RAG: encoding query & retrieving docs") | |
| user_query_encoded = embedding_model.encode( | |
| [user_message], convert_to_numpy=True | |
| )[0] | |
| top_chunk_keys = get_top_chunk_keys( | |
| user_query_encoded, keys_chunksEncoded, top_n=5 | |
| ) | |
| docs = get_docs(top_chunk_keys, knowledge_base) | |
| logger.info("RAG: docs retrieved=%d (top_n=5)", len(docs)) | |
| except Exception as e: | |
| logger.error("RAG: retrieval failed: %s", e) | |
| yield [ | |
| { | |
| "role": "assistant", | |
| "content": "⚠️ An error occurred during document retrieval. Please try again later.", | |
| } | |
| ] | |
| return | |
| # LLM stream | |
| try: | |
| logger.info( | |
| "LLM: opening streaming completion (model=mistralai/mistral-nemo-12b-instruct/fp-8)" | |
| ) | |
| messages = get_messages(docs, user_message, chat_history) | |
| chat_stream = openAI_client.chat.completions.create( | |
| model="mistralai/mistral-nemo-12b-instruct/fp-8", | |
| messages=messages, | |
| stream=True, | |
| ) | |
| logger.info("LLM: stream opened") | |
| except Exception as e: | |
| logger.error("LLM: API call failed: %s", e) | |
| yield [ | |
| { | |
| "role": "assistant", | |
| "content": "⚠️ An error occurred during client API call. Please try again later.", | |
| } | |
| ] | |
| return | |
| # Stream parse → yield a single growing assistant message | |
| assistant_msg = {"role": "assistant", "content": ""} | |
| try: | |
| logger.info("LLM: streaming started") | |
| buffer = "" | |
| chunks_seen = 0 | |
| content_events = 0 | |
| chars_emitted = 0 | |
| for chunk in chat_stream: | |
| chunks_seen += 1 | |
| choices = getattr(chunk, "choices", None) | |
| if not choices: | |
| continue | |
| delta = getattr(choices[0], "delta", None) | |
| if not delta: | |
| continue | |
| piece = getattr(delta, "content", None) | |
| if piece is None: | |
| continue | |
| piece = _strip_think_tags(piece) | |
| if not piece: | |
| continue | |
| content_events += 1 | |
| buffer += piece | |
| if len(buffer) >= 24 or "\n" in buffer: | |
| assistant_msg["content"] += buffer | |
| chars_emitted += len(buffer) | |
| yield [assistant_msg] # append/update single assistant bubble | |
| buffer = "" | |
| if buffer: | |
| assistant_msg["content"] += buffer | |
| chars_emitted += len(buffer) | |
| yield [assistant_msg] | |
| logger.info( | |
| "LLM: streaming finished (chunks=%d, content_events=%d, chars=%d)", | |
| chunks_seen, | |
| content_events, | |
| chars_emitted, | |
| ) | |
| except Exception as e: | |
| logger.error("LLM: streaming failed: %s", e) | |
| if assistant_msg["content"]: | |
| assistant_msg[ | |
| "content" | |
| ] += "\n\n⚠️ An error occurred during LLM response streaming. Please try again later." | |
| yield [assistant_msg] | |
| else: | |
| yield [ | |
| { | |
| "role": "assistant", | |
| "content": "⚠️ An error occurred during LLM response streaming. Please try again later.", | |
| } | |
| ] | |
| return | |
| # --- Upload transcript (optional) — reconstruct current turn explicitly | |
| try: | |
| if chat_uploader is not None: | |
| # Gradio passes prior turns in `chat_history`. Build latest full transcript. | |
| prior = _to_minimal(chat_history) | |
| current_user = {"role": "user", "content": user_message} | |
| final_history = prior + [ | |
| current_user, | |
| {"role": "assistant", "content": assistant_msg["content"]}, | |
| ] | |
| # Ensure we have a usable browser_id for the filename | |
| if not browser_id: | |
| browser_id = str(uuid.uuid4()) | |
| drive_filename = f"chat__{browser_id}.json" | |
| logger.info( | |
| "Upload: writing Drive file '%s' (messages=%d, mode=overwrite)", | |
| drive_filename, | |
| len(final_history), | |
| ) | |
| chat_uploader.upload_chat_history( | |
| final_history, | |
| browser_id, | |
| filename=drive_filename, | |
| mode="overwrite", # <-- overwrite-by-name semantics | |
| ) | |
| logger.info("Upload: completed") | |
| else: | |
| logger.info("Upload: skipped (uploader not configured)") | |
| except Exception as e: | |
| logger.warning("Upload: failed (non-fatal): %s", e) | |
| # --------------- Gradio app --------------- | |
| with gr.Blocks() as demo: | |
| browser_id_state = gr.BrowserState(default_value=None) | |
| def load_browser_id(current_id): | |
| if current_id is None or current_id == "": | |
| new_id = str(uuid.uuid4()) | |
| logger.info("Browser ID created: %s", new_id) | |
| return new_id | |
| logger.info("Browser ID reused: %s", current_id) | |
| return current_id | |
| gr.ChatInterface( | |
| fn=rag_chatbot, | |
| title="Matthew Schulz's RAG Chatbot 💬🤖", | |
| additional_inputs=browser_id_state, | |
| type="messages", | |
| examples=[ | |
| ["What is Matthew's educational background?", None], | |
| [ | |
| "What is Matthew's current role at Visa and what problems is he solving?", | |
| None, | |
| ], | |
| [ | |
| "What machine learning projects has Matthew worked on and what were the outcomes?", | |
| None, | |
| ], | |
| ["What research did Matthew conduct at the USC AutoDrive Lab?", None], | |
| [ | |
| "What did Matthew accomplish as Lead Software Engineer at SchedGo (now EduRoute)?", | |
| None, | |
| ], | |
| [ | |
| "What were the key results of Matthew’s internship at NASA’s Deep Space Network (Peraton)?", | |
| None, | |
| ], | |
| [ | |
| "Give me a general background on Matthew's education and work experience.", | |
| None, | |
| ], | |
| [ | |
| "Which tools, technologies, and coding practices does Matthew prefer and why?", | |
| None, | |
| ], | |
| [ | |
| "What are Matthew’s strengths and weaknesses, and how is he addressing growth areas?", | |
| None, | |
| ], | |
| [ | |
| "What are Matthew’s hobbies and personal interests?", | |
| None, | |
| ], | |
| ["Why did Matthew choose to pursue a degree in computer science?", None], | |
| ["Does Matthew have any leadership experience?", None], | |
| ], | |
| save_history=True, | |
| run_examples_on_click=False, | |
| cache_examples=False, | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |