|
|
import torch |
|
|
from torch import nn |
|
|
from transformers import AutoTokenizer, AutoModel |
|
|
import os.path |
|
|
import faiss |
|
|
|
|
|
class EmbeddingRetriever: |
|
|
def __init__(self, embedding_model_name, PATH_IDX, chunks): |
|
|
self.PATH_IDX = PATH_IDX |
|
|
self.embedding_model_name = embedding_model_name |
|
|
self.tokenizer = AutoTokenizer.from_pretrained(embedding_model_name) |
|
|
self.embedding_model = AutoModel.from_pretrained(embedding_model_name) |
|
|
|
|
|
self.index = self.get_idx(chunks) |
|
|
|
|
|
def get_idx(self, chunks): |
|
|
if os.path.exists(self.PATH_IDX): |
|
|
index = self.load_faiss_index(self.PATH_IDX) |
|
|
else: |
|
|
encoded_docs = self.tokenizer(["source: {}, content: {}".format(chunk.metadata['source'], chunk.page_content) for chunk in chunks], |
|
|
padding = 'max_length', |
|
|
return_tensors="pt") |
|
|
word_embeddings = self.embedding_model(**encoded_docs).last_hidden_state |
|
|
|
|
|
index = self.build_faiss_index(word_embeddings) |
|
|
self.save_faiss_index(index, self.PATH_IDX) |
|
|
return index |
|
|
|
|
|
|
|
|
def retrieve_data(self, query, TOP_K): |
|
|
query_tokens = self.tokenizer(query, padding = 'max_length', return_tensors="pt") |
|
|
query_embedding = self.embedding_model(**query_tokens).last_hidden_state |
|
|
m = nn.Flatten() |
|
|
np_query_embedding = m(query_embedding).detach().numpy() |
|
|
|
|
|
distances, indices = self.index.search(np_query_embedding, TOP_K) |
|
|
|
|
|
return indices[0] |
|
|
|
|
|
def build_faiss_index(self,embeddings): |
|
|
"""Builds a FAISS index for efficient similarity search.""" |
|
|
m = nn.Flatten() |
|
|
embeddings = m(embeddings) |
|
|
dimension = embeddings.shape[1] |
|
|
index = faiss.IndexFlatL2(dimension) |
|
|
np_emb = embeddings.detach().numpy() |
|
|
print("shape index:",np_emb.shape) |
|
|
index.add(np_emb) |
|
|
return index |
|
|
|
|
|
def save_faiss_index(self, index, index_file_path): |
|
|
"""Saves a FAISS index to a file.""" |
|
|
faiss.write_index(index, index_file_path) |
|
|
print(f"FAISS index saved to {index_file_path}") |
|
|
|
|
|
def load_faiss_index(self, index_file_path): |
|
|
"""Loads a FAISS index from a file.""" |
|
|
if os.path.exists(index_file_path): |
|
|
index = faiss.read_index(index_file_path) |
|
|
print(f"FAISS index loaded from {index_file_path}") |
|
|
return index |
|
|
else: |
|
|
return None |