import os import json import base64 import uuid import logging from typing import Generator, List, Dict, Tuple, Optional from sentence_transformers import SentenceTransformer from openai import OpenAI import gradio as gr from dotenv import load_dotenv from utils.utils import ( get_keys_chunks, get_docs, get_top_chunk_keys, get_messages, load_knowledge_base, ) from utils.chatLogger import ChatUploader # --------------- Logging --------------- def _setup_logging() -> logging.Logger: logging.basicConfig( level=logging.INFO, format="%(levelname)s:%(name)s:%(message)s", ) return logging.getLogger(__name__) # --------------- Initialization --------------- def _require_env(var: str) -> str: val = os.getenv(var) if not val: raise RuntimeError(f"Missing required environment variable: {var}") return val def initialize() -> Tuple[ SentenceTransformer, List[Tuple[str, "numpy.ndarray"]], Dict, OpenAI, logging.Logger, Optional[ChatUploader], ]: logger = _setup_logging() logger.info("Initializing application...") load_dotenv(override=False) logger.info(".env loaded (override=False)") embedding_model_path = "ibm-granite/granite-embedding-125m-english" embedding_model = SentenceTransformer(embedding_model_path) logger.info("Embedding model loaded: %s", embedding_model_path) knowledge_base = load_knowledge_base() logger.info("Knowledge base loaded") pairs = list(get_keys_chunks(knowledge_base)) if not pairs: raise RuntimeError("Knowledge base is empty – no chunks to encode.") keys, chunks = zip(*pairs) logger.info("KB chunks extracted: %d", len(chunks)) chunks_encoded = embedding_model.encode( list(chunks), batch_size=64, convert_to_numpy=True, show_progress_bar=False, ) keys_chunks_encoded = list(zip(keys, chunks_encoded)) logger.info("KB chunks encoded: %d", len(keys_chunks_encoded)) inference_api_key = _require_env("INFERENCE_API_KEY") openai_client = OpenAI( base_url="https://api.inference.net/v1", api_key=inference_api_key, ) logger.info("OpenAI client initialized (base_url=api.inference.net)") chat_uploader: Optional[ChatUploader] = None drive_creds_b64 = os.getenv("GOOGLE_DRIVE_SERVICE_ACCOUNT_CREDENTIALS_BASE64") if drive_creds_b64: try: service_account_json = json.loads( base64.b64decode(drive_creds_b64).decode() ) chat_uploader = ChatUploader(service_account_json) logger.info("Google Drive uploader configured") except Exception as e: logger.warning( "Google Drive uploader not configured (error parsing creds): %s", e ) chat_uploader = None else: logger.info("Google Drive uploader not configured (no creds env var)") logger.info("Initialization complete") return ( embedding_model, keys_chunks_encoded, knowledge_base, openai_client, logger, chat_uploader, ) ( embedding_model, keys_chunksEncoded, knowledge_base, openAI_client, logger, chat_uploader, ) = initialize() # --------------- Helpers --------------- def _strip_think_tags(text: str) -> str: return text.replace("", "").replace("", "") def _to_minimal(history: List[Dict[str, str]]) -> List[Dict[str, str]]: """Keep only role/content keys to avoid metadata/options noise in uploads.""" minimal: List[Dict[str, str]] = [] for m in history: role = m.get("role") content = m.get("content", "") if role is None: # ignore malformed entries continue minimal.append({"role": role, "content": content}) return minimal # --------------- RAG Chatbot --------------- def rag_chatbot( user_message: str, chat_history: List[Dict[str, str]], browser_id: str, ) -> Generator[List[Dict[str, str]], None, None]: """ Stream assistant output as a single growing message dict. Do NOT mutate chat_history; Gradio manages it for type="messages". """ # RAG retrieval try: logger.info("RAG: encoding query & retrieving docs") user_query_encoded = embedding_model.encode( [user_message], convert_to_numpy=True )[0] top_chunk_keys = get_top_chunk_keys( user_query_encoded, keys_chunksEncoded, top_n=5 ) docs = get_docs(top_chunk_keys, knowledge_base) logger.info("RAG: docs retrieved=%d (top_n=5)", len(docs)) except Exception as e: logger.error("RAG: retrieval failed: %s", e) yield [ { "role": "assistant", "content": "⚠️ An error occurred during document retrieval. Please try again later.", } ] return # LLM stream try: logger.info( "LLM: opening streaming completion (model=mistralai/mistral-nemo-12b-instruct/fp-8)" ) messages = get_messages(docs, user_message, chat_history) chat_stream = openAI_client.chat.completions.create( model="mistralai/mistral-nemo-12b-instruct/fp-8", messages=messages, stream=True, ) logger.info("LLM: stream opened") except Exception as e: logger.error("LLM: API call failed: %s", e) yield [ { "role": "assistant", "content": "⚠️ An error occurred during client API call. Please try again later.", } ] return # Stream parse → yield a single growing assistant message assistant_msg = {"role": "assistant", "content": ""} try: logger.info("LLM: streaming started") buffer = "" chunks_seen = 0 content_events = 0 chars_emitted = 0 for chunk in chat_stream: chunks_seen += 1 choices = getattr(chunk, "choices", None) if not choices: continue delta = getattr(choices[0], "delta", None) if not delta: continue piece = getattr(delta, "content", None) if piece is None: continue piece = _strip_think_tags(piece) if not piece: continue content_events += 1 buffer += piece if len(buffer) >= 24 or "\n" in buffer: assistant_msg["content"] += buffer chars_emitted += len(buffer) yield [assistant_msg] # append/update single assistant bubble buffer = "" if buffer: assistant_msg["content"] += buffer chars_emitted += len(buffer) yield [assistant_msg] logger.info( "LLM: streaming finished (chunks=%d, content_events=%d, chars=%d)", chunks_seen, content_events, chars_emitted, ) except Exception as e: logger.error("LLM: streaming failed: %s", e) if assistant_msg["content"]: assistant_msg[ "content" ] += "\n\n⚠️ An error occurred during LLM response streaming. Please try again later." yield [assistant_msg] else: yield [ { "role": "assistant", "content": "⚠️ An error occurred during LLM response streaming. Please try again later.", } ] return # --- Upload transcript (optional) — reconstruct current turn explicitly try: if chat_uploader is not None: # Gradio passes prior turns in `chat_history`. Build latest full transcript. prior = _to_minimal(chat_history) current_user = {"role": "user", "content": user_message} final_history = prior + [ current_user, {"role": "assistant", "content": assistant_msg["content"]}, ] # Ensure we have a usable browser_id for the filename if not browser_id: browser_id = str(uuid.uuid4()) drive_filename = f"chat__{browser_id}.json" logger.info( "Upload: writing Drive file '%s' (messages=%d, mode=overwrite)", drive_filename, len(final_history), ) chat_uploader.upload_chat_history( final_history, browser_id, filename=drive_filename, mode="overwrite", # <-- overwrite-by-name semantics ) logger.info("Upload: completed") else: logger.info("Upload: skipped (uploader not configured)") except Exception as e: logger.warning("Upload: failed (non-fatal): %s", e) # --------------- Gradio app --------------- with gr.Blocks() as demo: browser_id_state = gr.BrowserState(default_value=None) @demo.load(inputs=browser_id_state, outputs=browser_id_state) def load_browser_id(current_id): if current_id is None or current_id == "": new_id = str(uuid.uuid4()) logger.info("Browser ID created: %s", new_id) return new_id logger.info("Browser ID reused: %s", current_id) return current_id gr.ChatInterface( fn=rag_chatbot, title="Matthew Schulz's RAG Chatbot 💬🤖", additional_inputs=browser_id_state, type="messages", examples=[ ["What is Matthew's educational background?", None], [ "What is Matthew's current role at Visa and what problems is he solving?", None, ], [ "What machine learning projects has Matthew worked on and what were the outcomes?", None, ], ["What research did Matthew conduct at the USC AutoDrive Lab?", None], [ "What did Matthew accomplish as Lead Software Engineer at SchedGo (now EduRoute)?", None, ], [ "What were the key results of Matthew’s internship at NASA’s Deep Space Network (Peraton)?", None, ], [ "Give me a general background on Matthew's education and work experience.", None, ], [ "Which tools, technologies, and coding practices does Matthew prefer and why?", None, ], [ "What are Matthew’s strengths and weaknesses, and how is he addressing growth areas?", None, ], [ "What are Matthew’s hobbies and personal interests?", None, ], ["Why did Matthew choose to pursue a degree in computer science?", None], ["Does Matthew have any leadership experience?", None], ], save_history=True, run_examples_on_click=False, cache_examples=False, ) if __name__ == "__main__": demo.launch()