import torch from torch import nn from transformers import AutoTokenizer, AutoModel import os.path import faiss class EmbeddingRetriever: def __init__(self, embedding_model_name, PATH_IDX, chunks, device): # device = 'cpu' self.device = device self.PATH_IDX = PATH_IDX self.embedding_model_name = embedding_model_name self.tokenizer = AutoTokenizer.from_pretrained(embedding_model_name) self.embedding_model = AutoModel.from_pretrained(embedding_model_name) self.embedding_model.to(device) self.index = self.get_idx(chunks) def get_idx(self, chunks): if os.path.exists(self.PATH_IDX): index = self.load_faiss_index(self.PATH_IDX) else: encoded_docs = self.tokenizer(["source: {}, content: {}".format(chunk.metadata['source'], chunk.page_content) for chunk in chunks], padding = 'max_length', max_length=512, # truncation=True, # padding = True, return_tensors="pt").to(self.device) word_embeddings = self.embedding_model(**encoded_docs).last_hidden_state index = self.build_faiss_index(word_embeddings) self.save_faiss_index(index, self.PATH_IDX) return index def retrieve_data(self, query, TOP_K): query_tokens = self.tokenizer(query, padding = 'max_length', return_tensors="pt", max_length=512).to(self.device) query_embedding = self.embedding_model(**query_tokens).last_hidden_state m = nn.Flatten() np_query_embedding = m(query_embedding).detach().numpy() distances, indices = self.index.search(np_query_embedding, TOP_K) return indices[0] def build_faiss_index(self,embeddings): """Builds a FAISS index for efficient similarity search.""" m = nn.Flatten() embeddings = m(embeddings) dimension = embeddings.shape[1] index = faiss.IndexFlatL2(dimension) # L2 distance for similarity np_emb = embeddings.detach().numpy() print("shape index:",np_emb.shape) index.add(np_emb) return index def save_faiss_index(self, index, index_file_path): """Saves a FAISS index to a file.""" faiss.write_index(index, index_file_path) print(f"FAISS index saved to {index_file_path}") def load_faiss_index(self, index_file_path): """Loads a FAISS index from a file.""" if os.path.exists(index_file_path): index = faiss.read_index(index_file_path) # print(f"FAISS index loaded from {index_file_path}") return index else: return None