Spaces:
Sleeping
Sleeping
| import os | |
| import logging | |
| from typing import List, Tuple, Optional | |
| import numpy as np | |
| import faiss | |
| import fitz # PyMuPDF | |
| from sentence_transformers import SentenceTransformer | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from langchain_core.documents import Document | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from langchain_community.vectorstores import FAISS | |
| # Logger setup | |
| logger = logging.getLogger(__name__) | |
| logging.basicConfig(level=logging.INFO) | |
| # app/embeddings.py | |
| import os | |
| from typing import List, Dict | |
| from langchain_community.vectorstores import FAISS | |
| from langchain.embeddings import HuggingFaceEmbeddings | |
| from langchain_community.docstore.in_memory import InMemoryDocstore | |
| from langchain.schema import Document | |
| embedding_model = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2") | |
| # Path to FAISS index | |
| FAISS_INDEX_DIR = "vector_index" | |
| os.makedirs(FAISS_INDEX_DIR, exist_ok=True) | |
| def embed_file(file_path: str) -> bool: | |
| """ | |
| Reads a file, embeds it into FAISS vector store and saves it. | |
| """ | |
| if not os.path.exists(file_path): | |
| raise FileNotFoundError(f"β File not found: {file_path}") | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| text = f.read() | |
| texts = [text] | |
| metadatas = [{"source": file_path}] | |
| # Create and save vector store | |
| vector_store = FAISS.from_texts(texts, embedding_model, metadatas=metadatas) | |
| vector_store.save_local(FAISS_INDEX_DIR) | |
| return True | |
| def query_file_chunks(query: str, k: int = 3) -> List[Document]: | |
| """ | |
| Loads FAISS vector store and performs semantic search. | |
| """ | |
| try: | |
| vector_store = FAISS.load_local(FAISS_INDEX_DIR, embedding_model) | |
| except Exception as e: | |
| raise RuntimeError(f"β Failed to load vector store: {e}") | |
| results = vector_store.similarity_search(query, k=k) | |
| return results | |
| # Optionally define DocStore for manual use | |
| DocStore = InMemoryDocstore({}) | |
| # === PDF Document Store using FAISS & SentenceTransformers === | |
| class DocStore: | |
| def __init__(self, model_name: str = "all-MiniLM-L6-v2", embedding_dim: int = 384): | |
| self.model = SentenceTransformer(model_name) | |
| self.index = faiss.IndexFlatL2(embedding_dim) | |
| self.texts: List[str] = [] | |
| self.metadata: List[str] = [] | |
| def _chunk_text(self, text: str, chunk_size: int = 1000) -> List[str]: | |
| return [text[i:i + chunk_size] for i in range(0, len(text), chunk_size)] | |
| def add_document(self, filepath: str): | |
| doc = fitz.open(filepath) | |
| full_text = "\n".join(page.get_text() for page in doc) | |
| chunks = self._chunk_text(full_text) | |
| for chunk in chunks: | |
| embedding = self.model.encode(chunk) | |
| self.texts.append(chunk) | |
| self.metadata.append(filepath) | |
| self.index.add(np.array([embedding], dtype=np.float32)) | |
| logger.info(f"π Added {len(chunks)} chunks from {filepath} to FAISS index.") | |
| def retrieve(self, query: str, top_k: int = 3) -> List[Tuple[str, str]]: | |
| query_vector = self.model.encode(query).astype(np.float32) | |
| distances, indices = self.index.search(np.array([query_vector]), top_k) | |
| results = [] | |
| for idx in indices[0]: | |
| if idx < len(self.texts): | |
| results.append((self.metadata[idx], self.texts[idx])) | |
| return results | |
| # === Utility to Add Documents to LangChain VectorStore === | |
| def add_to_vector_store(documents: List[Document | dict], vector_store) -> bool: | |
| try: | |
| if documents and isinstance(documents[0], dict): | |
| documents = [Document(**doc) for doc in documents] | |
| logger.info(f"π¦ Adding {len(documents)} documents to vector store...") | |
| vector_store.add_documents(documents) | |
| logger.info("β Documents added successfully.") | |
| return True | |
| except Exception as e: | |
| logger.error(f"β Error adding to vector store: {e}", exc_info=True) | |
| return False | |
| # === Local In-Memory Embedding + Search === | |
| class LocalEmbeddingStore: | |
| def __init__(self, model_name: str = "all-MiniLM-L6-v2", chunk_size: int = 300): | |
| self.model = SentenceTransformer(model_name) | |
| self.chunk_size = chunk_size | |
| self.store: dict[str, List[Tuple[str, np.ndarray]]] = {} | |
| def embed_file(self, filepath: str) -> dict: | |
| with open(filepath, "r", encoding="utf-8") as f: | |
| content = f.read() | |
| chunks = [content[i:i + self.chunk_size] for i in range(0, len(content), self.chunk_size)] | |
| vectors = self.model.encode(chunks) | |
| self.store[filepath] = list(zip(chunks, vectors)) | |
| logger.info(f"π Embedded {len(chunks)} chunks from {filepath}.") | |
| return {"chunks": len(chunks)} | |
| def query(self, filename: str, query: str, top_k: int = 3) -> dict: | |
| if filename not in self.store: | |
| return {"error": "File not embedded"} | |
| chunks_vectors = self.store[filename] | |
| query_vec = self.model.encode([query])[0] | |
| similarities = [(text, cosine_similarity([query_vec], [vec])[0][0]) for text, vec in chunks_vectors] | |
| top_chunks = sorted(similarities, key=lambda x: x[1], reverse=True)[:top_k] | |
| return {"answer": "\n\n".join(chunk for chunk, _ in top_chunks)} | |
| # === LangChain-Compatible FAISS Vector Store Manager === | |
| class VectorStoreManager: | |
| def __init__(self, embedding_model=None, index_path: str = "db_index"): | |
| self.embedding_model = embedding_model or HuggingFaceEmbeddings() | |
| self.index_path = index_path | |
| self.db: Optional[FAISS] = None | |
| def init_vector_store(self): | |
| if os.path.exists(self.index_path): | |
| self.db = FAISS.load_local(self.index_path, self.embedding_model) | |
| logger.info(f"π Loaded existing FAISS index from {self.index_path}") | |
| else: | |
| logger.warning(f"β οΈ No index found at {self.index_path}. It will be created on first add.") | |
| def add_texts(self, texts: List[str], ids: Optional[List[str]] = None): | |
| if self.db is None: | |
| self.db = FAISS.from_texts(texts, self.embedding_model, ids=ids) | |
| else: | |
| self.db.add_texts(texts=texts, ids=ids) | |
| self.db.save_local(self.index_path) | |
| logger.info(f"β Saved FAISS index with {len(texts)} texts to {self.index_path}") | |
| def similarity_search(self, query: str, k: int = 3) -> List[str]: | |
| if self.db is None: | |
| logger.warning("β οΈ Vector store not initialized.") | |
| return [] | |
| return self.db.similarity_search(query, k=k) | |
| # === Test Usage === | |
| if __name__ == "__main__": | |
| sample_pdf = "sample.pdf" | |
| sample_txt = "data/sample.txt" | |
| # FAISS PDF store | |
| store = DocStore() | |
| if os.path.exists(sample_pdf): | |
| store.add_document(sample_pdf) | |
| results = store.retrieve("What is the return policy?") | |
| for meta, chunk in results: | |
| print(f"\nπ File: {meta}\nπ Snippet: {chunk[:200]}...\n") | |
| # Local text store | |
| local_store = LocalEmbeddingStore() | |
| if os.path.exists(sample_txt): | |
| print(local_store.embed_file(sample_txt)) | |
| print(local_store.query(sample_txt, "discount offers")) | |
| # VectorStoreManager test | |
| vsm = VectorStoreManager() | |
| vsm.init_vector_store() | |
| vsm.add_texts(["This is a test document."], ids=["test_doc_1"]) | |
| print(vsm.similarity_search("test")) | |