Spaces:
Sleeping
Sleeping
| import logging | |
| import os | |
| import base64 | |
| import datetime | |
| import dotenv | |
| import pandas as pd | |
| import streamlit as st | |
| from streamlit_tags import st_tags | |
| from PyPDF2 import PdfReader, PdfWriter | |
| from presidio_analyzer import AnalyzerEngine, RecognizerRegistry, PatternRecognizer, RecognizerResult | |
| from presidio_anonymizer import AnonymizerEngine | |
| from presidio_anonymizer.entities import OperatorConfig | |
| st.set_page_config(page_title="Presidio PHI De-identification", layout="wide", initial_sidebar_state="expanded", menu_items={"About": "https://microsoft.github.io/presidio/"}) | |
| dotenv.load_dotenv() | |
| logger = logging.getLogger("presidio-streamlit") | |
| def get_timestamp_prefix() -> str: | |
| """🕒 Stamps time like a boss with Central flair!""" | |
| central = pytz.timezone("US/Central") | |
| return datetime.now(central).strftime("%I%M%p_%d-%m-%y").upper() | |
| def nlp_engine_and_registry(model_family: str, model_path: str) -> tuple[object, RecognizerRegistry]: | |
| """🤖 Fires up NLP engines with a spark of genius!""" | |
| registry = RecognizerRegistry() | |
| if model_family.lower() == "flair": | |
| from flair.models import SequenceTagger | |
| tagger = SequenceTagger.load(model_path) | |
| registry.load_predefined_recognizers() | |
| registry.add_recognizer_from_dict({"name": "flair_recognizer", "supported_language": "en", "supported_entities": ["PERSON", "LOCATION", "ORGANIZATION"], "model": model_path, "package": "flair"}) | |
| return tagger, registry | |
| elif model_family.lower() == "huggingface": | |
| from transformers import pipeline | |
| nlp = pipeline("ner", model=model_path, tokenizer=model_path) | |
| registry.load_predefined_recognizers() | |
| registry.add_recognizer_from_dict({"name": "huggingface_recognizer", "supported_language": "en", "supported_entities": ["PERSON", "LOCATION", "ORGANIZATION", "DATE_TIME"], "model": model_path, "package": "transformers"}) | |
| return nlp, registry | |
| raise ValueError(f"Model family {model_family} not supported") | |
| def analyzer_engine(model_family: str, model_path: str) -> AnalyzerEngine: | |
| """🔍 Unleashes the PHI-sniffing bloodhound!""" | |
| nlp_engine, registry = nlp_engine_and_registry(model_family, model_path) | |
| return AnalyzerEngine(registry=registry) | |
| def get_supported_entities(model_family: str, model_path: str) -> list[str]: | |
| """📋 Lists what secrets we’re hunting—PHI beware!""" | |
| if model_family.lower() == "huggingface": | |
| return ["PERSON", "LOCATION", "ORGANIZATION", "DATE_TIME"] | |
| elif model_family.lower() == "flair": | |
| return ["PERSON", "LOCATION", "ORGANIZATION"] | |
| return ["PERSON", "LOCATION", "ORGANIZATION"] | |
| # Feature Spotlight: 🕵️♂️ The Great PHI Hunt Begins! | |
| # With a flick of the wrist, we summon models to sniff out sensitive data in PDFs, making privacy a breeze! 😎 | |
| def analyze(analyzer: AnalyzerEngine, text: str, entities: list[str], language: str, score_threshold: float, return_decision_process: bool, allow_list: list[str], deny_list: list[str]) -> list[RecognizerResult]: | |
| """🦸 Swoops in to spot PHI with laser precision!""" | |
| results = analyzer.analyze(text=text, entities=entities, language=language, score_threshold=score_threshold, return_decision_process=return_decision_process) | |
| filtered_results = [] | |
| for result in results: | |
| text_snippet = text[result.start:result.end].lower() | |
| if any(word.lower() in text_snippet for word in allow_list): | |
| continue | |
| if any(word.lower() in text_snippet for word in deny_list) or not deny_list: | |
| filtered_results.append(result) | |
| return filtered_results | |
| def anonymize(text: str, operator: str, analyze_results: list[RecognizerResult], mask_char: str = "*", number_of_chars: int = 15) -> dict: | |
| """🕵️♀️ Cloaks PHI in a disguise—poof, it’s gone!""" | |
| anonymizer = AnonymizerEngine() | |
| operator_config = {"DEFAULT": OperatorConfig(operator, {})} | |
| if operator == "mask": | |
| operator_config["DEFAULT"] = OperatorConfig(operator, {"masking_char": mask_char, "chars_to_mask": number_of_chars}) | |
| return anonymizer.anonymize(text=text, analyzer_results=analyze_results, operators=operator_config) | |
| def create_ad_hoc_deny_list_recognizer(deny_list: list[str] = None) -> PatternRecognizer: | |
| """🚨 Builds a naughty list to catch sneaky PHI!""" | |
| if not deny_list: | |
| return None | |
| return PatternRecognizer(supported_entity="GENERIC_PII", deny_list=deny_list) | |
| def save_pdf(pdf_input) -> str: | |
| """💾 Drops PDFs onto disk like hot cakes!""" | |
| original_name = pdf_input.name | |
| with open(original_name, "wb") as f: | |
| f.write(pdf_input.read()) | |
| return original_name | |
| # Feature Spotlight: 📄 PDF Magic Unleashed! | |
| # Upload a PDF, zap the PHI, and grab a shiny new file—all with a timestamp swagger! ✨ | |
| def read_pdf(pdf_path: str) -> str: | |
| """📖 Slurps up PDF text like a thirsty camel!""" | |
| reader = PdfReader(pdf_path) | |
| return "".join(page.extract_text() or "" + "\n" for page in reader.pages) | |
| def create_pdf(text: str, input_path: str, output_filename: str) -> str: | |
| """🖨️ Crafts a fresh PDF with PHI-proof swagger!""" | |
| reader = PdfReader(input_path) | |
| writer = PdfWriter() | |
| for page in reader.pages: | |
| writer.add_page(page) | |
| with open(output_filename, "wb") as f: | |
| writer.write(f) | |
| return output_filename | |
| # Sidebar setup | |
| st.sidebar.header("PHI De-identification with Presidio") | |
| model_list = [ | |
| ("flair/ner-english-large", "https://huggingface.co/flair/ner-english-large"), | |
| ("HuggingFace/obi/deid_roberta_i2b2", "https://huggingface.co/obi/deid_roberta_i2b2"), | |
| ("HuggingFace/StanfordAIMI/stanford-deidentifier-base", "https://huggingface.co/StanfordAIMI/stanford-deidentifier-base"), | |
| ] | |
| st_model = st.sidebar.selectbox("NER model package", [model[0] for model in model_list], index=0, help="Pick your PHI-hunting hero!") | |
| st.sidebar.markdown(f"[View model on HuggingFace]({next(url for model, url in model_list if model == st_model)})") | |
| st_model_package = st_model.split("/")[0] | |
| st_model = st_model if st_model_package.lower() != "huggingface" else "/".join(st_model.split("/")[1:]) | |
| analyzer_params = (st_model_package, st_model) | |
| st.sidebar.warning("Models may take a sec to wake up!") | |
| st_operator = st.sidebar.selectbox("De-identification approach", ["replace", "redact", "mask"], index=0, help="Choose how to zap PHI!") | |
| st_threshold = st.sidebar.slider("Acceptance threshold", 0.0, 1.0, 0.35) | |
| st_return_decision_process = st.sidebar.checkbox("Add analysis explanations", False) | |
| with st.sidebar.expander("Allowlists and denylists"): | |
| st_allow_list = st_tags(label="Add words to allowlist", text="Enter word and press enter.") | |
| st_deny_list = st_tags(label="Add words to denylist", text="Enter word and press enter.") | |
| # Main panel | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.subheader("Input") | |
| uploaded_file = st.file_uploader("Upload PDF", type=["pdf"]) | |
| if uploaded_file: | |
| try: | |
| pdf_path = save_pdf(uploaded_file) | |
| if not pdf_path: | |
| raise ValueError("PDF save flopped!") | |
| text = read_pdf(pdf_path) | |
| if not text: | |
| raise ValueError("No text in that PDF!") | |
| analyzer = analyzer_engine(*analyzer_params) | |
| st_analyze_results = analyze( | |
| analyzer=analyzer, | |
| text=text, | |
| entities=get_supported_entities(*analyzer_params), | |
| language="en", | |
| score_threshold=st_threshold, | |
| return_decision_process=st_return_decision_process, | |
| allow_list=st_allow_list, | |
| deny_list=st_deny_list, | |
| ) | |
| phi_types = set(res.entity_type for res in st_analyze_results) | |
| if phi_types: | |
| st.success(f"Removed PHI types: {', '.join(phi_types)}") | |
| else: | |
| st.info("No PHI detected") | |
| anonymized_result = anonymize(text=text, operator=st_operator, analyze_results=st_analyze_results) | |
| timestamp = get_timestamp_prefix() | |
| output_filename = f"{timestamp}_{uploaded_file.name}" | |
| pdf_output = create_pdf(anonymized_result.text, pdf_path, output_filename) | |
| if not pdf_output: | |
| raise ValueError("PDF creation tanked!") | |
| with open(output_filename, "rb") as f: | |
| pdf_bytes = f.read() | |
| b64 = base64.b64encode(pdf_bytes).decode() | |
| st.markdown(f'<a href="data:application/pdf;base64,{b64}" download="{output_filename}">Download de-identified PDF</a>', unsafe_allow_html=True) | |
| with col2: | |
| st.subheader("Findings") | |
| if st_analyze_results: | |
| df = pd.DataFrame.from_records([r.to_dict() for r in st_analyze_results]) | |
| df["text"] = [text[res.start:res.end] for res in st_analyze_results] | |
| df_subset = df[["entity_type", "text", "start", "end", "score"]].rename( | |
| {"entity_type": "Entity type", "text": "Text", "start": "Start", "end": "End", "score": "Confidence"}, axis=1 | |
| ) | |
| if st_return_decision_process: | |
| analysis_explanation_df = pd.DataFrame.from_records([r.analysis_explanation.to_dict() for r in st_analyze_results]) | |
| df_subset = pd.concat([df_subset, analysis_explanation_df], axis=1) | |
| st.dataframe(df_subset.reset_index(drop=True), use_container_width=True) | |
| else: | |
| st.text("No findings") | |
| if os.path.exists(pdf_path): | |
| os.remove(pdf_path) | |
| except Exception as e: | |
| st.error(f"Oops, something broke: {str(e)}") | |
| logger.error(f"Processing error: {str(e)}") |