import os # Ensure Streamlit and ML caches write to a writable location (e.g., on HF Spaces) os.environ["HOME"] = "/tmp" os.environ["STREAMLIT_BROWSER_GATHER_USAGE_STATS"] = "false" os.environ["STREAMLIT_GLOBAL_DATA_DIR"] = "/tmp/.streamlit" os.environ["XDG_CACHE_HOME"] = "/tmp/.cache" os.environ["HF_HOME"] = "/tmp/hf" os.environ["TRANSFORMERS_CACHE"] = "/tmp/hf/transformers" os.environ["SENTENCE_TRANSFORMERS_HOME"] = "/tmp/hf/sentence-transformers" os.environ["TORCH_HOME"] = "/tmp/torch" # Create the cache directories for _d in [ os.environ["XDG_CACHE_HOME"], os.environ["HF_HOME"], os.environ["TRANSFORMERS_CACHE"], os.environ["SENTENCE_TRANSFORMERS_HOME"], os.environ["TORCH_HOME"], os.environ.get("STREAMLIT_GLOBAL_DATA_DIR", "/tmp/.streamlit"), ]: try: os.makedirs(_d, exist_ok=True) except Exception: pass import streamlit as st from dotenv import load_dotenv, find_dotenv from langchain.prompts import PromptTemplate from langchain.schema.runnable import RunnablePassthrough from langchain_core.runnables import RunnableLambda from langchain_groq import ChatGroq import time from src.vector_store import build_or_load_vectorstore from src.ingestion import load_data_subset, preprocess_dataframe, df_to_documents, load_hf_dataset from src.retriever import build_advanced_retriever from src.config import DATA_PATH, FAISS_INDEX_PATH, GROQ_API_KEY, GEMINI_API_KEY, ANTHROPIC_API_KEY, GROQ_MODEL, GEMINI_MODEL, ANTHROPIC_MODEL load_dotenv(find_dotenv()) # Initialize global vectorstore reference to avoid NameError before it is set vectorstore = None # PAGE CONFIG - Must be first Streamlit command st.set_page_config( page_title="Research Assistant", page_icon="🤖", layout="wide", initial_sidebar_state="expanded" # Start with sidebar expanded ) # ENHANCED CUSTOM CSS - ChatGPT-like styling st.markdown(""" """, unsafe_allow_html=True) # Title with emoji and clean design col1, col2, col3 = st.columns([1, 6, 1]) with col2: st.markdown("

🤖 Research Assistant

", unsafe_allow_html=True) st.markdown("

Powered by Multi-LLM RAG + FAISS

", unsafe_allow_html=True) # Sidebar controls with improved organization with st.sidebar: st.markdown("### ⚙️ Configuration") with st.expander("📊 Dataset Info", expanded=False): index_repo = os.environ.get("HF_DATASET_REPO_ID", "Wasifjafri/research-paper-vdb") index_dir = os.environ.get("FAISS_INDEX_REMOTE_DIR", "faiss_index") source_ds = os.environ.get("HF_SOURCE_DATASET", "") st.markdown(f""" **Vector index:** downloaded from `{index_repo}/{index_dir}` (HF dataset) Rebuild (optional) requires a papers dataset set via env: - `HF_SOURCE_DATASET` = `/` (e.g., `CShorten/ML-ArXiv-Papers`) If not set, the app will skip rebuilding and just use the packaged FAISS index. Current HF_SOURCE_DATASET: `{source_ds or 'not set'}` """) st.markdown("---") with st.expander("🔍 Retrieval Settings", expanded=False): base_k = st.slider("Initial fetch", 4, 30, 20, 1, help="Number of documents to initially retrieve") rerank_k = st.slider("Final docs", 1, 12, 8, 1, help="Number of documents after reranking") dynamic = st.checkbox("Dynamic k", True, help="Adjust retrieval size dynamically") use_rerank = st.checkbox("Use reranking", True, help="Apply reranking for better relevance") with st.expander("🔧 Advanced Filters"): primary_category = st.text_input("Category filter", "", help="Filter by arXiv category") or None col1, col2 = st.columns(2) with col1: year_min = st.number_input("Min year", value=0, step=1) with col2: year_max = st.number_input("Max year", value=0, step=1) if year_min == 0: year_min = None if year_max == 0: year_max = None st.markdown("---") with st.expander("🔄 Index Management", expanded=False): subset_size = st.number_input("Dataset size", 1000, 100000, 10000, 1000) rebuild = st.button("🔨 Rebuild Index", use_container_width=True) st.markdown("---") with st.expander("🤖 LLM Provider", expanded=False): # Determine default provider based on available API keys if ANTHROPIC_API_KEY: default_provider = "Anthropic (Claude)" elif GEMINI_API_KEY: default_provider = "Gemini" elif GROQ_API_KEY: default_provider = "Groq" else: default_provider = "Gemini" available_providers = ["Anthropic (Claude)", "Gemini", "Groq"] try: default_index = available_providers.index(default_provider) except ValueError: default_index = 0 provider = st.selectbox("Provider", available_providers, index=default_index) if provider == "Anthropic (Claude)": ui_anthropic_model = st.selectbox( "Model", [ "claude-sonnet-4-5-20250929", "claude-opus-4-1-20250805", "claude-opus-4-20250514", "claude-sonnet-4-20250514", "claude-3-7-sonnet-20250219", "claude-3-5-haiku-20241022", "claude-3-haiku-20240307" ], index=3 ) ui_gemini_model = None ui_groq_model = None elif provider == "Gemini": ui_gemini_model = st.text_input("Model", GEMINI_MODEL) ui_groq_model = None ui_anthropic_model = None else: ui_groq_model = st.text_input("Model", GROQ_MODEL) ui_gemini_model = None ui_anthropic_model = None # Stats at bottom st.markdown("---") try: if 'vectorstore' in locals(): index_stats = vectorstore.index.ntotal if hasattr(vectorstore, 'index') else "Unknown" st.metric("📚 Embeddings", f"{index_stats:,}" if isinstance(index_stats, int) else index_stats) except: pass # Build or load vectorstore from typing import Optional def _load_df_from_hf(num_records: int, dataset_name: Optional[str] = None): """Load dataset from Hugging Face when rebuilding is explicitly requested. Only used for index rebuilds; normal path downloads the ready-made FAISS index. """ ds_name = dataset_name or os.environ.get("HF_SOURCE_DATASET") if not ds_name: st.error("❌ Rebuild requested but HF_SOURCE_DATASET is not set. Set it to a dataset like 'CShorten/ML-ArXiv-Papers'.") st.stop() try: with st.spinner(f"🔄 Loading papers from Hugging Face dataset: {ds_name}..."): df = load_hf_dataset(num_records=num_records, dataset_name=ds_name) return preprocess_dataframe(df) except Exception as e: st.error(f"❌ Failed to load dataset '{ds_name}': {e}") st.info("💡 If the dataset is private, add your HF token as a secret and set HF_SOURCE_DATASET.") st.stop() # Default path: try to download+load the FAISS index from HF dataset repo if not rebuild: try: vectorstore = build_or_load_vectorstore([], force_rebuild=False) except Exception as e: st.error("❌ Could not load the FAISS index from the configured dataset repo.") st.info("💡 Check HF_DATASET_REPO_ID/FAISS_INDEX_REMOTE_DIR env vars and that the dataset has index.faiss/index.pkl.") st.stop() else: # Rebuild only when explicitly requested and a source dataset is configured with st.spinner("🔨 Rebuilding vector index from source dataset..."): df = _load_df_from_hf(num_records=int(subset_size)) docs = df_to_documents(df) vectorstore = build_or_load_vectorstore( docs, force_rebuild=True, chunk_method="semantic", chunk_size=1000, chunk_overlap=125 ) def make_llm(provider_name: str): if provider_name == "Anthropic (Claude)": if not ANTHROPIC_API_KEY: st.error("❌ ANTHROPIC_API_KEY not set") st.stop() try: from langchain_anthropic import ChatAnthropic return ChatAnthropic( model=ui_anthropic_model or ANTHROPIC_MODEL, temperature=0.7, max_tokens=2048, api_key=ANTHROPIC_API_KEY, ) except Exception as e: st.error(f"❌ Claude initialization failed: {e}") st.stop() if provider_name == "Gemini": if not GEMINI_API_KEY: st.error("❌ GEMINI_API_KEY not set") st.stop() try: from langchain_google_genai import ChatGoogleGenerativeAI return ChatGoogleGenerativeAI( model=ui_gemini_model or GEMINI_MODEL, temperature=0.7, max_output_tokens=1024, api_key=GEMINI_API_KEY, ) except Exception as e: st.error(f"❌ Gemini initialization failed: {e}") st.stop() if not GROQ_API_KEY: st.error("❌ No valid LLM provider configured") st.stop() return ChatGroq( model=ui_groq_model or GROQ_MODEL, temperature=0.7, max_tokens=1024, groq_api_key=GROQ_API_KEY, ) llm = make_llm(provider) # Relevance checking prompt relevance_check_prompt = """You are a research paper relevance checker. Your task is to determine if the retrieved documents are relevant to the user's question. Retrieved Documents: {context} User Question: {question} Instructions: - Carefully analyze whether the retrieved documents contain information that can answer the user's question - Consider if the documents discuss the topic, concepts, or methods mentioned in the question - Respond with ONLY one word: "RELEVANT" or "IRRELEVANT" - Be strict: if the documents are only tangentially related or don't actually address the question, respond "IRRELEVANT" Response:""" relevance_prompt = PromptTemplate(template=relevance_check_prompt, input_variables=["context", "question"]) # IMPROVED PROMPT prompt_template = """You are a knowledgeable and helpful research assistant specializing in arXiv papers. You MUST ONLY answer questions based on the provided research papers context. Context from Research Papers: {context} User Question: {question} CRITICAL RULES: - ONLY use information from the provided research papers context above - DO NOT use your general knowledge or training data - If the context doesn't contain relevant information, you MUST respond with: "I couldn't find relevant information about this topic in the available research papers. The retrieved documents don't address your question. Please try different search terms or the database may not contain papers on this specific topic." Instructions: - Analyze the user's question and provide a thorough, well-structured response BASED ONLY ON THE CONTEXT - Be conversational and descriptive - explain concepts clearly with sufficient detail - Use multiple paragraphs when needed to fully address the question **For paper listing requests** (e.g., "find papers", "list papers", "show papers"): Format as a structured list with detailed summaries: **Paper #[Number]: [Title]** - **Authors:** [Author names] - **Year:** [Publication year] - **ArXiv ID:** [ID if available] - **Category:** [Research category] - **Summary:** [3-4 sentences explaining the paper's objectives, methodology, key contributions, and findings based on the context] **For specific questions** (e.g., "What is...", "Explain...", "How does...", "What is the purpose of..."): - Provide a comprehensive, multi-paragraph answer that fully addresses the question USING ONLY THE CONTEXT - Start with a clear overview or direct answer from the papers - Elaborate with details, context, and explanations from the research papers - Discuss relevant methodologies, findings, implications, or technical details found in the papers - Cite sources naturally throughout (e.g., "According to the research by [Authors] (Year)...") - Use clear transitions between ideas - Conclude with key takeaways or significance when appropriate **General Guidelines:** - Write in a natural, conversational tone similar to ChatGPT - Aim for depth and clarity - don't give one-liner responses - Break complex information into digestible paragraphs - Use examples and analogies when helpful from the context - NEVER invent or hallucinate information not in the context - Always prioritize being helpful, informative, and thorough - but ONLY based on the provided context Answer:""" prompt = PromptTemplate(template=prompt_template, input_variables=["context", "question"]) def _format_metadata(metadata): """Format metadata in a clean, readable way.""" if not metadata: return "" meta_lines = [] if metadata.get("title"): meta_lines.append(f"📄 {metadata['title']}") if metadata.get("id"): meta_lines.append(f"🔗 {metadata['id']}") if metadata.get("authors") and metadata["authors"] != "N/A": authors = metadata['authors'] if len(authors) > 100: authors = authors[:100] + "..." meta_lines.append(f"👥 {authors}") if metadata.get("year"): meta_lines.append(f"📅 {metadata['year']}") if metadata.get("primary_category") and metadata["primary_category"] != "N/A": meta_lines.append(f"🏷️ {metadata['primary_category']}") return " • ".join(meta_lines) def format_docs(docs): """Format documents with clear structure and metadata.""" if not docs: return "No relevant documents found in the database." formatted_chunks = [] for idx, doc in enumerate(docs, start=1): meta_str = _format_metadata(doc.metadata) content = doc.page_content.strip() if len(content) > 1000: content = content[:1000] + "..." formatted_chunk = f"[Document {idx}]\n{meta_str}\n\n{content}" formatted_chunks.append(formatted_chunk) return "\n\n" + "="*80 + "\n\n".join(formatted_chunks) def build_chain(): """Build the RAG chain with improved retrieval.""" retriever = build_advanced_retriever( vectorstore, base_k=base_k, rerank_k=rerank_k, primary_category=primary_category, year_min=year_min, year_max=year_max, dynamic=dynamic, use_rerank=use_rerank, ) def retrieval_with_logging(q): try: docs = retriever.get_relevant_documents(q) return format_docs(docs) except Exception as e: return f"Error retrieving documents: {e}" retrieval_runnable = RunnableLambda(retrieval_with_logging) chain = {"context": retrieval_runnable, "question": RunnablePassthrough()} | prompt | llm return chain, retriever # Initialize session state if "messages" not in st.session_state: st.session_state["messages"] = [] st.session_state["show_welcome"] = True # Welcome message with suggestions if st.session_state.get("show_welcome", False): st.markdown("""

👋 Welcome to Research Assistant!

I'm your AI-powered research companion. Ask me anything about Machine Learning papers!

🔍 Find papers on transformers 💡 Explain attention mechanism 📊 Compare CNN vs RNN 🎯 Latest in reinforcement learning
""", unsafe_allow_html=True) st.session_state["show_welcome"] = False # Helper functions def is_casual_conversation(query_text): """Check if the query is a greeting or casual conversation.""" query_lower = query_text.lower().strip() greetings = ["hi", "hello", "hey", "good morning", "good afternoon", "good evening", "hola", "greetings", "howdy", "yo", "sup", "what's up", "whats up"] casual_patterns = [ "how are you", "how r u", "how do you do", "what's up", "whats up", "who are you", "what are you", "what is your name", "your name", "what can you do", "help me", "can you help", "thank you", "thanks", "bye", "goodbye", "see you", "nice to meet you", "pleasure" ] if query_lower in greetings: return True for pattern in casual_patterns: if pattern in query_lower: return True return False def get_casual_response(query_text): """Generate appropriate response for casual conversation.""" query_lower = query_text.lower().strip() if any(word in query_lower for word in ["hi", "hello", "hey", "hola", "howdy", "yo"]): return "Hello! 👋 I'm your AI Research Assistant for Machine Learning papers. How can I help you today?" if "good morning" in query_lower: return "Good morning! ☀️ Ready to explore some ML research? What interests you today?" if "good afternoon" in query_lower: return "Good afternoon! 🌤️ Let's dive into some research! What would you like to learn about?" if "good evening" in query_lower: return "Good evening! 🌙 I'm here to help with ML research. What topic interests you?" if any(phrase in query_lower for phrase in ["how are you", "how r u", "how do you do"]): return "I'm doing great, thanks! 😊 Ready to help you explore ML research. What's on your mind?" if any(phrase in query_lower for phrase in ["who are you", "what are you", "your name"]): return "I'm an AI Research Assistant specialized in Machine Learning! 🤖 I help you find papers, explain concepts, and answer research questions. What would you like to know?" if any(phrase in query_lower for phrase in ["what can you do", "help me", "can you help"]): return """I can help you with: 🔍 **Finding research papers** on specific ML topics 📚 **Explaining ML concepts** from published research 💡 **Answering questions** about techniques and methods 🎓 **Exploring** the latest ML research developments Try asking: - "Find papers on deep learning" - "What is transfer learning?" - "Explain adversarial training" What interests you?""" if any(word in query_lower for word in ["thank you", "thanks", "thx"]): return "You're welcome! 😊 Happy to help! Let me know if you have other questions." if any(word in query_lower for word in ["bye", "goodbye", "see you"]): return "Goodbye! 👋 Come back anytime for ML research help. Happy learning!" return "I'm here to help with Machine Learning research! 😊 Ask me about any ML topics or papers." # Chat input query = st.chat_input("💬 Ask me anything about ML research...") # Display chat history for i, msg in enumerate(st.session_state["messages"]): # Show user message st.chat_message("user", avatar="👤").write(msg["query"]) # Show assistant response if available if msg.get("answer") is not None: with st.chat_message("assistant", avatar="🤖"): st.write(msg["answer"]) if msg.get("context") and len(msg["context"]) > 0: with st.expander(f"📄 View {len(msg['context'])} Retrieved Documents", expanded=False): for idx, doc in enumerate(msg["context"], 1): st.markdown(f"**📎 Document {idx}**") st.caption(_format_metadata(doc.metadata)) st.text_area( f"Content {idx}", doc.page_content[:800] + ("..." if len(doc.page_content) > 800 else ""), height=150, key=f"doc_{i}_{idx}", disabled=True ) if idx < len(msg["context"]): st.markdown("---") else: # Answer is being generated - show thinking indicator with st.chat_message("assistant", avatar="🤖"): thinking_placeholder = st.empty() thinking_placeholder.markdown('

🔍 Searching research papers...

', unsafe_allow_html=True) # Check if casual conversation if is_casual_conversation(msg["query"]): casual_response = get_casual_response(msg["query"]) # Smooth streaming effect response_placeholder = st.empty() full_response = "" words = casual_response.split() for word in words: full_response += word + " " response_placeholder.markdown(full_response) time.sleep(0.02) st.session_state["messages"][i]["answer"] = casual_response st.rerun() else: # Research question - full RAG pipeline rag_chain, adv_retriever = build_chain() docs = [] answer_text = "" error_occurred = False try: docs = adv_retriever.get_relevant_documents(msg["query"]) if not docs: answer_text = """I couldn't find any relevant research papers in the database that match your query. **💡 Suggestions:** - Try using broader or different search terms - Check the spelling of technical terms - The database may not contain papers on this specific topic - Consider rebuilding the index with more data The current database focuses on ArXiv ML papers, but may not cover all research areas comprehensively.""" else: thinking_placeholder.markdown('

🧠 Analyzing documents...

', unsafe_allow_html=True) # Check relevance formatted_context = format_docs(docs) relevance_check_chain = {"context": RunnablePassthrough(), "question": RunnablePassthrough()} | relevance_prompt | llm relevance_result = relevance_check_chain.invoke({"context": formatted_context, "question": msg["query"]}) relevance_text = relevance_result.content if hasattr(relevance_result, "content") else str(relevance_result) if "IRRELEVANT" in relevance_text.strip().upper(): answer_text = f"""I found {len(docs)} documents in the database, but they don't contain relevant information about your question. **📋 Retrieved topics:** - {docs[0].metadata.get('title', 'Various topics') if docs else 'N/A'} **💡 Suggestions:** - Try rephrasing with different keywords - Use more specific technical terms - Search for related concepts or broader topics - The database may not have papers specifically on this topic I can only provide answers based on the ArXiv papers in the database.""" else: # Generate answer with streaming thinking_placeholder.markdown('

✍️ Generating response...

', unsafe_allow_html=True) answer = rag_chain.invoke(msg["query"]) answer_text = answer.content if hasattr(answer, "content") else str(answer) except Exception as e: error_occurred = True msg_err = str(e) if "models/" in msg_err and "not found" in msg_err.lower(): answer_text = "⚠️ Selected model not found. Try a different model in the sidebar." else: answer_text = f"⚠️ An error occurred: {e}\n\nPlease try again or rebuild the index." # Clear thinking and display response with streaming thinking_placeholder.empty() # Stream response import re response_placeholder = st.empty() parts = re.split(r'(\n\n|(?<=[.!?])\s+)', answer_text) full_response = "" for part in parts: full_response += part response_placeholder.markdown(full_response) time.sleep(0.03) # Update session state st.session_state["messages"][i]["answer"] = answer_text st.session_state["messages"][i]["context"] = docs # Show retrieved documents if docs: with st.expander(f"📄 View {len(docs)} Retrieved Documents", expanded=False): for idx, doc in enumerate(docs, 1): st.markdown(f"**📎 Document {idx}**") st.caption(_format_metadata(doc.metadata)) st.text_area( f"Content {idx}", doc.page_content[:800] + ("..." if len(doc.page_content) > 800 else ""), height=150, key=f"new_doc_{i}_{idx}", disabled=True ) if idx < len(docs): st.markdown("---") st.rerun() # Process new query if query: # Add message to session state immediately st.session_state["messages"].append({ "query": query, "answer": None, "context": [] }) # Force rerun to show the user message immediately st.rerun() # Footer with tips - only show if there are messages if len(st.session_state["messages"]) > 0: st.markdown("---") with st.expander("💡 Tips for Better Results", expanded=False): col1, col2 = st.columns(2) with col1: st.markdown(""" **🎯 Asking Better Questions** ✅ Use specific ML terminology ✅ Mention techniques or methods ✅ Ask for comparisons ✅ Reference specific problems **Examples:** - "Papers on transformer architecture" - "Compare CNNs vs Vision Transformers" - "Explain BERT training methodology" """) with col2: st.markdown(""" **📚 Understanding Responses** ✅ All answers from actual papers ✅ View source documents anytime ✅ Check relevance of results ✅ Adjust settings if needed **⚡ Advanced Tips:** - Use sidebar filters (year, category) - Adjust retrieval settings - Try different LLM providers - Rebuild index for fresh data """) # Add a "Clear Chat" button at the bottom of sidebar with st.sidebar: st.markdown("---") if st.button("🗑️ Clear Chat History", use_container_width=True): st.session_state["messages"] = [] st.session_state["show_welcome"] = True st.rerun()