mkschulz9's picture
refactor: update KB and sample Qs
e02e175
import os
import json
import base64
import uuid
import logging
from typing import Generator, List, Dict, Tuple, Optional
from sentence_transformers import SentenceTransformer
from openai import OpenAI
import gradio as gr
from dotenv import load_dotenv
from utils.utils import (
get_keys_chunks,
get_docs,
get_top_chunk_keys,
get_messages,
load_knowledge_base,
)
from utils.chatLogger import ChatUploader
# --------------- Logging ---------------
def _setup_logging() -> logging.Logger:
logging.basicConfig(
level=logging.INFO,
format="%(levelname)s:%(name)s:%(message)s",
)
return logging.getLogger(__name__)
# --------------- Initialization ---------------
def _require_env(var: str) -> str:
val = os.getenv(var)
if not val:
raise RuntimeError(f"Missing required environment variable: {var}")
return val
def initialize() -> Tuple[
SentenceTransformer,
List[Tuple[str, "numpy.ndarray"]],
Dict,
OpenAI,
logging.Logger,
Optional[ChatUploader],
]:
logger = _setup_logging()
logger.info("Initializing application...")
load_dotenv(override=False)
logger.info(".env loaded (override=False)")
embedding_model_path = "ibm-granite/granite-embedding-125m-english"
embedding_model = SentenceTransformer(embedding_model_path)
logger.info("Embedding model loaded: %s", embedding_model_path)
knowledge_base = load_knowledge_base()
logger.info("Knowledge base loaded")
pairs = list(get_keys_chunks(knowledge_base))
if not pairs:
raise RuntimeError("Knowledge base is empty – no chunks to encode.")
keys, chunks = zip(*pairs)
logger.info("KB chunks extracted: %d", len(chunks))
chunks_encoded = embedding_model.encode(
list(chunks),
batch_size=64,
convert_to_numpy=True,
show_progress_bar=False,
)
keys_chunks_encoded = list(zip(keys, chunks_encoded))
logger.info("KB chunks encoded: %d", len(keys_chunks_encoded))
inference_api_key = _require_env("INFERENCE_API_KEY")
openai_client = OpenAI(
base_url="https://api.inference.net/v1",
api_key=inference_api_key,
)
logger.info("OpenAI client initialized (base_url=api.inference.net)")
chat_uploader: Optional[ChatUploader] = None
drive_creds_b64 = os.getenv("GOOGLE_DRIVE_SERVICE_ACCOUNT_CREDENTIALS_BASE64")
if drive_creds_b64:
try:
service_account_json = json.loads(
base64.b64decode(drive_creds_b64).decode()
)
chat_uploader = ChatUploader(service_account_json)
logger.info("Google Drive uploader configured")
except Exception as e:
logger.warning(
"Google Drive uploader not configured (error parsing creds): %s", e
)
chat_uploader = None
else:
logger.info("Google Drive uploader not configured (no creds env var)")
logger.info("Initialization complete")
return (
embedding_model,
keys_chunks_encoded,
knowledge_base,
openai_client,
logger,
chat_uploader,
)
(
embedding_model,
keys_chunksEncoded,
knowledge_base,
openAI_client,
logger,
chat_uploader,
) = initialize()
# --------------- Helpers ---------------
def _strip_think_tags(text: str) -> str:
return text.replace("<think>", "").replace("</think>", "")
def _to_minimal(history: List[Dict[str, str]]) -> List[Dict[str, str]]:
"""Keep only role/content keys to avoid metadata/options noise in uploads."""
minimal: List[Dict[str, str]] = []
for m in history:
role = m.get("role")
content = m.get("content", "")
if role is None:
# ignore malformed entries
continue
minimal.append({"role": role, "content": content})
return minimal
# --------------- RAG Chatbot ---------------
def rag_chatbot(
user_message: str,
chat_history: List[Dict[str, str]],
browser_id: str,
) -> Generator[List[Dict[str, str]], None, None]:
"""
Stream assistant output as a single growing message dict.
Do NOT mutate chat_history; Gradio manages it for type="messages".
"""
# RAG retrieval
try:
logger.info("RAG: encoding query & retrieving docs")
user_query_encoded = embedding_model.encode(
[user_message], convert_to_numpy=True
)[0]
top_chunk_keys = get_top_chunk_keys(
user_query_encoded, keys_chunksEncoded, top_n=5
)
docs = get_docs(top_chunk_keys, knowledge_base)
logger.info("RAG: docs retrieved=%d (top_n=5)", len(docs))
except Exception as e:
logger.error("RAG: retrieval failed: %s", e)
yield [
{
"role": "assistant",
"content": "⚠️ An error occurred during document retrieval. Please try again later.",
}
]
return
# LLM stream
try:
logger.info(
"LLM: opening streaming completion (model=mistralai/mistral-nemo-12b-instruct/fp-8)"
)
messages = get_messages(docs, user_message, chat_history)
chat_stream = openAI_client.chat.completions.create(
model="mistralai/mistral-nemo-12b-instruct/fp-8",
messages=messages,
stream=True,
)
logger.info("LLM: stream opened")
except Exception as e:
logger.error("LLM: API call failed: %s", e)
yield [
{
"role": "assistant",
"content": "⚠️ An error occurred during client API call. Please try again later.",
}
]
return
# Stream parse → yield a single growing assistant message
assistant_msg = {"role": "assistant", "content": ""}
try:
logger.info("LLM: streaming started")
buffer = ""
chunks_seen = 0
content_events = 0
chars_emitted = 0
for chunk in chat_stream:
chunks_seen += 1
choices = getattr(chunk, "choices", None)
if not choices:
continue
delta = getattr(choices[0], "delta", None)
if not delta:
continue
piece = getattr(delta, "content", None)
if piece is None:
continue
piece = _strip_think_tags(piece)
if not piece:
continue
content_events += 1
buffer += piece
if len(buffer) >= 24 or "\n" in buffer:
assistant_msg["content"] += buffer
chars_emitted += len(buffer)
yield [assistant_msg] # append/update single assistant bubble
buffer = ""
if buffer:
assistant_msg["content"] += buffer
chars_emitted += len(buffer)
yield [assistant_msg]
logger.info(
"LLM: streaming finished (chunks=%d, content_events=%d, chars=%d)",
chunks_seen,
content_events,
chars_emitted,
)
except Exception as e:
logger.error("LLM: streaming failed: %s", e)
if assistant_msg["content"]:
assistant_msg[
"content"
] += "\n\n⚠️ An error occurred during LLM response streaming. Please try again later."
yield [assistant_msg]
else:
yield [
{
"role": "assistant",
"content": "⚠️ An error occurred during LLM response streaming. Please try again later.",
}
]
return
# --- Upload transcript (optional) — reconstruct current turn explicitly
try:
if chat_uploader is not None:
# Gradio passes prior turns in `chat_history`. Build latest full transcript.
prior = _to_minimal(chat_history)
current_user = {"role": "user", "content": user_message}
final_history = prior + [
current_user,
{"role": "assistant", "content": assistant_msg["content"]},
]
# Ensure we have a usable browser_id for the filename
if not browser_id:
browser_id = str(uuid.uuid4())
drive_filename = f"chat__{browser_id}.json"
logger.info(
"Upload: writing Drive file '%s' (messages=%d, mode=overwrite)",
drive_filename,
len(final_history),
)
chat_uploader.upload_chat_history(
final_history,
browser_id,
filename=drive_filename,
mode="overwrite", # <-- overwrite-by-name semantics
)
logger.info("Upload: completed")
else:
logger.info("Upload: skipped (uploader not configured)")
except Exception as e:
logger.warning("Upload: failed (non-fatal): %s", e)
# --------------- Gradio app ---------------
with gr.Blocks() as demo:
browser_id_state = gr.BrowserState(default_value=None)
@demo.load(inputs=browser_id_state, outputs=browser_id_state)
def load_browser_id(current_id):
if current_id is None or current_id == "":
new_id = str(uuid.uuid4())
logger.info("Browser ID created: %s", new_id)
return new_id
logger.info("Browser ID reused: %s", current_id)
return current_id
gr.ChatInterface(
fn=rag_chatbot,
title="Matthew Schulz's RAG Chatbot 💬🤖",
additional_inputs=browser_id_state,
type="messages",
examples=[
["What is Matthew's educational background?", None],
[
"What is Matthew's current role at Visa and what problems is he solving?",
None,
],
[
"What machine learning projects has Matthew worked on and what were the outcomes?",
None,
],
["What research did Matthew conduct at the USC AutoDrive Lab?", None],
[
"What did Matthew accomplish as Lead Software Engineer at SchedGo (now EduRoute)?",
None,
],
[
"What were the key results of Matthew’s internship at NASA’s Deep Space Network (Peraton)?",
None,
],
[
"Give me a general background on Matthew's education and work experience.",
None,
],
[
"Which tools, technologies, and coding practices does Matthew prefer and why?",
None,
],
[
"What are Matthew’s strengths and weaknesses, and how is he addressing growth areas?",
None,
],
[
"What are Matthew’s hobbies and personal interests?",
None,
],
["Why did Matthew choose to pursue a degree in computer science?", None],
["Does Matthew have any leadership experience?", None],
],
save_history=True,
run_examples_on_click=False,
cache_examples=False,
)
if __name__ == "__main__":
demo.launch()