import logging import os import base64 import datetime import dotenv import pandas as pd import streamlit as st from streamlit_tags import st_tags from PyPDF2 import PdfReader, PdfWriter from presidio_helpers import ( analyzer_engine, get_supported_entities, analyze, anonymize, ) st.set_page_config( page_title="Presidio PHI De-identification", layout="wide", initial_sidebar_state="expanded", menu_items={"About": "https://microsoft.github.io/presidio/"}, ) dotenv.load_dotenv() logger = logging.getLogger("presidio-streamlit") # Sidebar st.sidebar.header("PHI De-identification with Presidio") model_help_text = "Select Named Entity Recognition (NER) model for PHI detection." model_list = [ ("flair/ner-english-large", "https://huggingface.co/flair/ner-english-large"), ("HuggingFace/obi/deid_roberta_i2b2", "https://huggingface.co/obi/deid_roberta_i2b2"), ("HuggingFace/StanfordAIMI/stanford-deidentifier-base", "https://huggingface.co/StanfordAIMI/stanford-deidentifier-base"), ] st_model = st.sidebar.selectbox( "NER model package", [model[0] for model in model_list], index=0, help=model_help_text, ) # Display HuggingFace link for selected model selected_model_url = next(url for model, url in model_list if model == st_model) st.sidebar.markdown(f"[View model on HuggingFace]({selected_model_url})") # Extract model package st_model_package = st_model.split("/")[0] st_model = st_model if st_model_package.lower() not in ("huggingface") else "/".join(st_model.split("/")[1:]) analyzer_params = (st_model_package, st_model) st.sidebar.warning("Note: Models might take some time to download on first run.") st_operator = st.sidebar.selectbox( "De-identification approach", ["replace", "redact", "mask"], index=0, help="Select PHI manipulation method.", ) st_threshold = st.sidebar.slider( label="Acceptance threshold", min_value=0.0, max_value=1.0, value=0.35, ) st_return_decision_process = st.sidebar.checkbox( "Add analysis explanations", value=False, ) # Allow and deny lists with st.sidebar.expander("Allowlists and denylists", expanded=False): st_allow_list = st_tags(label="Add words to allowlist", text="Enter word and press enter.") st_deny_list = st_tags(label="Add words to denylist", text="Enter word and press enter.") # Main panel col1, col2 = st.columns(2) with col1: st.subheader("Input") uploaded_file = st.file_uploader("Upload PDF", type=["pdf"]) if uploaded_file: try: # Read PDF pdf_reader = PdfReader(uploaded_file) text = "" for page in pdf_reader.pages: text += page.extract_text() + "\n" # Initialize analyzer try: analyzer = analyzer_engine(*analyzer_params) except Exception as e: st.error(f"Failed to load model: {str(e)}") st.info("Ensure models are downloaded and check network/permissions.") raise # Analyze st_analyze_results = analyze( analyzer=analyzer, text=text, entities=get_supported_entities(*analyzer_params), language="en", score_threshold=st_threshold, return_decision_process=st_return_decision_process, allow_list=st_allow_list, deny_list=st_deny_list, ) # Process results phi_types = set(res.entity_type for res in st_analyze_results) if phi_types: st.success(f"Removed PHI types: {', '.join(phi_types)}") else: st.info("No PHI detected") # Anonymize anonymized_result = anonymize( text=text, operator=st_operator, analyze_results=st_analyze_results, ) # Create new PDF pdf_writer = PdfWriter() for page in pdf_reader.pages: pdf_writer.add_page(page) # Generate output filename with timestamp timestamp = datetime.datetime.now().strftime("%I%M%p_%d-%m-%y") output_filename = f"{timestamp}_{uploaded_file.name}" # Save modified PDF try: with open(output_filename, "wb") as f: pdf_writer.write(f) except PermissionError as e: st.error(f"Permission denied when saving PDF: {str(e)}") st.info("Check write permissions in the current directory.") raise # Generate base64 download link try: with open(output_filename, "rb") as f: pdf_bytes = f.read() b64 = base64.b64encode(pdf_bytes).decode() href = f'Download de-identified PDF' st.markdown(href, unsafe_allow_html=True) except Exception as e: st.error(f"Error generating download link: {str(e)}") raise # Display findings with col2: st.subheader("Findings") if st_analyze_results: df = pd.DataFrame.from_records([r.to_dict() for r in st_analyze_results]) df["text"] = [text[res.start:res.end] for res in st_analyze_results] df_subset = df[["entity_type", "text", "start", "end", "score"]].rename( { "entity_type": "Entity type", "text": "Text", "start": "Start", "end": "End", "score": "Confidence", }, axis=1, ) if st_return_decision_process: analysis_explanation_df = pd.DataFrame.from_records( [r.analysis_explanation.to_dict() for r in st_analyze_results] ) df_subset = pd.concat([df_subset, analysis_explanation_df], axis=1) st.dataframe(df_subset.reset_index(drop=True), use_container_width=True) else: st.text("No findings") except Exception as e: st.error(f"An error occurred: {str(e)}") logger.error(f"Processing error: {str(e)}")