import re import spacy import nltk import numpy as np from keybert import KeyBERT from sentence_transformers import SentenceTransformer, util import umap import hdbscan # Ensure required NLTK data is downloaded nltk.download("punkt") class KeywordExtractor: def __init__(self): self.biobert = SentenceTransformer("dmis-lab/biobert-base-cased-v1.1") self.keybert_model = KeyBERT(model="all-MiniLM-L6-v2") self.nlp = spacy.load("en_core_web_sm") self.domain_stopwords = set([ "study", "result", "results", "conclusion", "method", "methods", "patients", "data", "analysis", "significant", "treatment", "effect", "effects", "disease", "clinical", "used", "use", "using", "based", "approach", "research", "paper" ]) def clean_text(self, text): return re.sub(r"\s+", " ", str(text).strip()) def extract_noun_chunks(self, text): doc = self.nlp(text) return list(set([ chunk.text.lower().strip() for chunk in doc.noun_chunks if len(chunk.text.split()) <= 5 ])) def get_keybert_candidates(self, text, top_k=50): keywords = self.keybert_model.extract_keywords( text, keyphrase_ngram_range=(1, 3), stop_words="english", use_mmr=True, diversity=0.7, top_n=top_k ) return [kw[0] for kw in keywords] def reduce_dimensions_umap(self, embeddings, n_neighbors=15, min_dist=0.1): reducer = umap.UMAP(n_components=2, n_neighbors=n_neighbors, min_dist=min_dist) return reducer.fit_transform(embeddings) def cluster_with_umap_hdbscan(self, candidates): if len(candidates) <= 5: return candidates embeddings = self.biobert.encode(candidates) reduced = self.reduce_dimensions_umap(embeddings) clusterer = hdbscan.HDBSCAN(min_cluster_size=2) labels = clusterer.fit_predict(reduced) final_keywords = [] for cluster_id in set(labels): if cluster_id == -1: continue cluster_indices = np.where(labels == cluster_id)[0] cluster_embs = [embeddings[i] for i in cluster_indices] center = np.mean(cluster_embs, axis=0) sims = util.cos_sim(center, cluster_embs)[0].cpu().numpy() rep_idx = cluster_indices[np.argmax(sims)] final_keywords.append(candidates[rep_idx]) return final_keywords def rerank_with_biobert(self, text, candidates, top_k=15): if not candidates: return [] text_emb = self.biobert.encode(text, convert_to_tensor=True) cand_emb = self.biobert.encode(candidates, convert_to_tensor=True) scores = util.cos_sim(text_emb, cand_emb)[0].cpu().numpy() ranked = np.argsort(scores)[::-1][:top_k] return [candidates[i] for i in ranked] def lemmatize_keywords(self, keywords): return list(set([ token.lemma_.lower().strip() for kw in keywords for token in self.nlp(kw) if token.is_alpha and len(token) > 2 and not token.is_stop and token.lemma_.lower() not in self.domain_stopwords ])) def extract(self, text: str): text = self.clean_text(text) noun_chunks = self.extract_noun_chunks(text) kb_candidates = self.get_keybert_candidates(text, top_k=50) all_candidates = list(set(noun_chunks + kb_candidates)) reranked = self.rerank_with_biobert(text, all_candidates, top_k=15) clustered_keywords = self.cluster_with_umap_hdbscan(reranked) final_keywords = self.lemmatize_keywords(clustered_keywords) return final_keywords