Spaces:
Sleeping
Sleeping
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.metrics.pairwise import linear_kernel, cosine_similarity | |
| from transformers import BertTokenizer | |
| import re | |
| import unicodedata | |
| import pandas as pd | |
| import numpy as np | |
| import nltk | |
| from nltk.stem.porter import PorterStemmer | |
| class TfidfRecommender : | |
| def __init__(self, df, id_col, text_col, tokenization_method) : | |
| """Initialize model parameters | |
| Args: | |
| id_col (str): Name of column containing item IDs. | |
| tokenization_method (str): ['none','nltk','bert','scibert'] option for tokenization method. | |
| """ | |
| self.id_col = id_col | |
| self.text_col = text_col | |
| self.df = df | |
| if tokenization_method.lower() not in ["none", "nltk", "bert", "scibert"]: | |
| raise ValueError( | |
| 'Tokenization method must be one of ["none" | "nltk" | "bert" | "scibert"]' | |
| ) | |
| self.tokenization_method = tokenization_method.lower() | |
| # Initialize other variables used in this class | |
| self.tf = TfidfVectorizer() | |
| self.tfidf_matrix = dict() | |
| self.tokens = dict() | |
| self.stop_words = frozenset() | |
| self.recommendations = dict() | |
| self.top_k_recommendations = pd.DataFrame() | |
| def __clean_text (self, text, for_Bert=False, verbose=False) : | |
| try: | |
| # Remove new line and tabs | |
| clean = text.replace("\n", " ") | |
| clean = clean.replace("\t", " ") | |
| clean = clean.replace("\r", " ") | |
| clean = clean.replace("Â\xa0", "") # non-breaking space | |
| # Remove all punctuation and special characters | |
| # clean = re.sub( | |
| # r"([^\s\w]|_)+", "", clean | |
| # ) # noqa W695 invalid escape sequence '\s' | |
| # If you want to keep some punctuation, see below commented out example | |
| clean = re.sub(r'([^,.:\s\w\-]|_)+','', clean) | |
| # Skip further processing if the text will be used in BERT tokenization | |
| if for_Bert is False: | |
| # Lower case | |
| clean = clean.lower() | |
| clean = re.sub( | |
| r"([^\s\w]|_)+", "", clean | |
| ) | |
| except Exception: | |
| if verbose : | |
| print("Cannot clean non-existent text") | |
| clean = "" | |
| return clean | |
| def _clean_df (self): | |
| self.df = self.df.replace(np.nan, "", regex=True) | |
| # df[new_col_name] = df[cols_to_clean].apply(lambda cols: " ".join(cols), axis=1) | |
| # Check if for BERT tokenization | |
| if self.tokenization_method in ["bert", "scibert"]: | |
| for_BERT = True | |
| else: | |
| for_BERT = False | |
| # Clean the text in the dataframe | |
| self.df[self.text_col] = self.df[self.text_col].map( | |
| lambda x: self.__clean_text(x, for_BERT) | |
| ) | |
| def tokenize_text (self, ngram_range=(1, 3), min_df=0.0) : | |
| """Tokenize the input text. | |
| Args: | |
| df_clean (pandas.DataFrame): Dataframe with cleaned text in the new column. | |
| text_col (str): Name of column containing the cleaned text. | |
| ngram_range (tuple of int): The lower and upper boundary of the range of n-values for different n-grams to be extracted. | |
| min_df (int): When building the vocabulary ignore terms that have a document frequency strictly lower than the given threshold. | |
| Returns: | |
| TfidfVectorizer, pandas.Series: | |
| - Scikit-learn TfidfVectorizer object defined in `.tokenize_text()`. | |
| - Each row contains tokens for respective documents separated by spaces. | |
| """ | |
| self._clean_df() | |
| vectors = self.df[self.text_col] | |
| if self.tokenization_method in ["bert", "scibert"] : | |
| # vectorizer | |
| tf = TfidfVectorizer( | |
| analyzer="word", | |
| ngram_range=ngram_range, | |
| min_df=min_df, | |
| stop_words="english", | |
| ) | |
| if self.tokenization_method == "bert": | |
| bert_method = "bert-base-cased" | |
| elif self.tokenization_method == "scibert": | |
| bert_method = "allenai/scibert_scivocab_cased" | |
| # Load pre-trained bert model (vocabulary) | |
| tokenizer = BertTokenizer.from_pretrained(bert_method) | |
| # tokenization | |
| vectors_tokenized = vectors.copy() | |
| for i in range(0, len(vectors)): | |
| vectors_tokenized[i] = " ".join(tokenizer.tokenize(vectors[i])) | |
| elif self.tokenization_method == "nltk": | |
| # NLTK Stemming | |
| token_dict = {} # noqa: F841 | |
| stemmer = PorterStemmer() | |
| def stem_tokens(tokens, stemmer): | |
| stemmed = [] | |
| for item in tokens: | |
| stemmed.append(stemmer.stem(item)) | |
| return stemmed | |
| def tokenize(text): | |
| tokens = nltk.word_tokenize(text) | |
| stems = stem_tokens(tokens, stemmer) | |
| return stems | |
| # The tokenization using a custom tokenizer is applied in the fit function | |
| tf = TfidfVectorizer( | |
| tokenizer=tokenize, | |
| analyzer="word", | |
| ngram_range=ngram_range, | |
| min_df=min_df, | |
| stop_words="english", | |
| ) | |
| vectors_tokenized = vectors | |
| elif self.tokenization_method == "none": | |
| # No tokenization applied | |
| tf = TfidfVectorizer( | |
| analyzer="word", | |
| ngram_range=ngram_range, | |
| min_df=min_df, | |
| stop_words="english", | |
| ) | |
| vectors_tokenized = vectors | |
| # Save to class variable | |
| self.tf = tf | |
| return tf, vectors_tokenized | |
| def fit (self, tf, vectors_tokenized) : | |
| self.tfidf_matrix = tf.fit_transform(vectors_tokenized) | |
| def get_tokens (self) : | |
| try: | |
| self.tokens = self.tf.vocabulary_ | |
| except Exception: | |
| self.tokens = "Run .tokenize_text() and .fit_tfidf() first" | |
| return self.tokens | |
| def get_stop_words (self) : | |
| try: | |
| self.stop_words = self.tf.get_stop_words() | |
| except Exception: | |
| self.stop_words = "Run .tokenize_text() and .fit_tfidf() first" | |
| return self.stop_words | |
| def recommend_k_items (self, title, k) : | |
| print("jjj") | |
| idx = self.df[self.df['title'] == title].index[0] | |
| print("ppp") | |
| cosine_sim = cosine_similarity(self.tfidf_matrix[int(idx)], self.tfidf_matrix) | |
| similarity_scores = list(enumerate(cosine_sim[0])) | |
| similarity_scores = sorted(similarity_scores, key=lambda x: x[1], reverse=True) | |
| similarity_scores = similarity_scores[1: k + 1] | |
| print("lol") | |
| movie_indices = [i[0] for i in similarity_scores] | |
| return self.df.iloc[movie_indices]['id'] | |