Spaces:
Sleeping
Sleeping
| import re | |
| import spacy | |
| import nltk | |
| import numpy as np | |
| from keybert import KeyBERT | |
| from sentence_transformers import SentenceTransformer, util | |
| import umap | |
| import hdbscan | |
| # Ensure required NLTK data is downloaded | |
| nltk.download("punkt") | |
| class KeywordExtractor: | |
| def __init__(self): | |
| self.biobert = SentenceTransformer("dmis-lab/biobert-base-cased-v1.1") | |
| self.keybert_model = KeyBERT(model="all-MiniLM-L6-v2") | |
| self.nlp = spacy.load("en_core_web_sm") | |
| self.domain_stopwords = set([ | |
| "study", "result", "results", "conclusion", "method", "methods", "patients", | |
| "data", "analysis", "significant", "treatment", "effect", "effects", "disease", | |
| "clinical", "used", "use", "using", "based", "approach", "research", "paper" | |
| ]) | |
| def clean_text(self, text): | |
| return re.sub(r"\s+", " ", str(text).strip()) | |
| def extract_noun_chunks(self, text): | |
| doc = self.nlp(text) | |
| return list(set([ | |
| chunk.text.lower().strip() | |
| for chunk in doc.noun_chunks | |
| if len(chunk.text.split()) <= 5 | |
| ])) | |
| def get_keybert_candidates(self, text, top_k=50): | |
| keywords = self.keybert_model.extract_keywords( | |
| text, | |
| keyphrase_ngram_range=(1, 3), | |
| stop_words="english", | |
| use_mmr=True, | |
| diversity=0.7, | |
| top_n=top_k | |
| ) | |
| return [kw[0] for kw in keywords] | |
| def reduce_dimensions_umap(self, embeddings, n_neighbors=15, min_dist=0.1): | |
| reducer = umap.UMAP(n_components=2, n_neighbors=n_neighbors, min_dist=min_dist) | |
| return reducer.fit_transform(embeddings) | |
| def cluster_with_umap_hdbscan(self, candidates): | |
| if len(candidates) <= 5: | |
| return candidates | |
| embeddings = self.biobert.encode(candidates) | |
| reduced = self.reduce_dimensions_umap(embeddings) | |
| clusterer = hdbscan.HDBSCAN(min_cluster_size=2) | |
| labels = clusterer.fit_predict(reduced) | |
| final_keywords = [] | |
| for cluster_id in set(labels): | |
| if cluster_id == -1: | |
| continue | |
| cluster_indices = np.where(labels == cluster_id)[0] | |
| cluster_embs = [embeddings[i] for i in cluster_indices] | |
| center = np.mean(cluster_embs, axis=0) | |
| sims = util.cos_sim(center, cluster_embs)[0].cpu().numpy() | |
| rep_idx = cluster_indices[np.argmax(sims)] | |
| final_keywords.append(candidates[rep_idx]) | |
| return final_keywords | |
| def rerank_with_biobert(self, text, candidates, top_k=15): | |
| if not candidates: | |
| return [] | |
| text_emb = self.biobert.encode(text, convert_to_tensor=True) | |
| cand_emb = self.biobert.encode(candidates, convert_to_tensor=True) | |
| scores = util.cos_sim(text_emb, cand_emb)[0].cpu().numpy() | |
| ranked = np.argsort(scores)[::-1][:top_k] | |
| return [candidates[i] for i in ranked] | |
| def lemmatize_keywords(self, keywords): | |
| return list(set([ | |
| token.lemma_.lower().strip() | |
| for kw in keywords | |
| for token in self.nlp(kw) | |
| if token.is_alpha and len(token) > 2 and not token.is_stop and token.lemma_.lower() not in self.domain_stopwords | |
| ])) | |
| def extract(self, text: str): | |
| text = self.clean_text(text) | |
| noun_chunks = self.extract_noun_chunks(text) | |
| kb_candidates = self.get_keybert_candidates(text, top_k=50) | |
| all_candidates = list(set(noun_chunks + kb_candidates)) | |
| reranked = self.rerank_with_biobert(text, all_candidates, top_k=15) | |
| clustered_keywords = self.cluster_with_umap_hdbscan(reranked) | |
| final_keywords = self.lemmatize_keywords(clustered_keywords) | |
| return final_keywords | |