Spaces:
Running
Running
| import logging | |
| import os | |
| import re | |
| import tempfile | |
| from huggingface_hub import HfApi, hf_hub_download | |
| from huggingface_hub.utils import EntryNotFoundError, RepositoryNotFoundError | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" | |
| ) | |
| # Files/extensions to definitely include | |
| INCLUDE_PATTERNS = [ | |
| ".py", | |
| "requirements.txt", | |
| "Dockerfile", | |
| ".js", | |
| ".jsx", | |
| ".ts", | |
| ".tsx", | |
| ".html", | |
| ".css", | |
| ".svelte", | |
| ".vue", | |
| ".json", | |
| ".yaml", | |
| ".yml", | |
| ".toml", | |
| "Procfile", | |
| ".sh", | |
| ] | |
| # Files/extensions/folders to ignore | |
| IGNORE_PATTERNS = [ | |
| ".git", | |
| ".hfignore", | |
| "README.md", | |
| "LICENSE", | |
| "__pycache__", | |
| ".ipynb_checkpoints", | |
| ".png", | |
| ".jpg", | |
| ".jpeg", | |
| ".gif", | |
| ".svg", | |
| ".ico", | |
| ".mp3", | |
| ".wav", | |
| ".mp4", | |
| ".mov", | |
| ".avi", | |
| ".onnx", | |
| ".pt", | |
| ".pth", | |
| ".bin", | |
| ".safetensors", | |
| ".tflite", | |
| ".pickle", | |
| ".pkl", | |
| ".joblib", | |
| ".parquet", | |
| ".csv", | |
| ".tsv", | |
| ".zip", | |
| ".tar.gz", | |
| ".gz", | |
| ".ipynb", | |
| ".DS_Store", | |
| "node_modules", | |
| ] | |
| # Regex to find potential Hugging Face model IDs (e.g., "org/model-name", "user/model-name") | |
| # This is a simple heuristic and might catch non-model strings or miss complex cases. | |
| HF_MODEL_ID_PATTERN = re.compile(r"([\"\'])([\w\-.]+/[\w\-\.]+)\1\'") | |
| # Max length for model descriptions to keep prompts manageable | |
| MAX_MODEL_DESC_LENGTH = 1500 | |
| SUMMARY_FILENAME = "summary_highlights.md" | |
| PRIVACY_FILENAME = "privacy_report.md" | |
| def _is_relevant_file(filename): | |
| """Check if a file should be included based on patterns.""" | |
| # Ignore files matching ignore patterns (case-insensitive check for some) | |
| lower_filename = filename.lower() | |
| if any( | |
| pattern in lower_filename | |
| for pattern in [".git", ".hfignore", "readme.md", "license"] | |
| ): | |
| return False | |
| if any( | |
| filename.endswith(ext) for ext in IGNORE_PATTERNS if ext.startswith(".") | |
| ): # Check extensions | |
| return False | |
| if any( | |
| part == pattern | |
| for part in filename.split("/") | |
| for pattern in IGNORE_PATTERNS | |
| if "." not in pattern and "/" not in pattern | |
| ): # Check directory/file names | |
| return False | |
| if filename in IGNORE_PATTERNS: # Check full filenames | |
| return False | |
| # Include files matching include patterns | |
| if any(filename.endswith(ext) for ext in INCLUDE_PATTERNS if ext.startswith(".")): | |
| return True | |
| if any(filename == pattern for pattern in INCLUDE_PATTERNS if "." not in pattern): | |
| return True | |
| # Default to False if not explicitly included (safer) | |
| # logging.debug(f"File '{filename}' excluded by default.") | |
| return False | |
| def get_space_code_files(space_id: str) -> dict[str, str]: | |
| """ | |
| Downloads relevant code and configuration files from a Hugging Face Space. | |
| Args: | |
| space_id: The ID of the Hugging Face Space (e.g., 'gradio/hello_world'). | |
| Returns: | |
| A dictionary where keys are filenames and values are file contents as strings. | |
| Returns an empty dictionary if the space is not found or has no relevant files. | |
| """ | |
| code_files = {} | |
| api = HfApi() | |
| try: | |
| logging.info(f"Fetching file list for Space: {space_id}") | |
| repo_files = api.list_repo_files(repo_id=space_id, repo_type="space") | |
| logging.info(f"Found {len(repo_files)} total files in {space_id}.") | |
| relevant_files = [f for f in repo_files if _is_relevant_file(f)] | |
| logging.info(f"Identified {len(relevant_files)} relevant files for download.") | |
| for filename in relevant_files: | |
| try: | |
| logging.debug(f"Downloading {filename} from {space_id}...") | |
| file_path = hf_hub_download( | |
| repo_id=space_id, | |
| filename=filename, | |
| repo_type="space", | |
| # Consider adding use_auth_token=os.getenv("HF_TOKEN") if accessing private spaces later | |
| ) | |
| with open(file_path, "r", encoding="utf-8", errors="ignore") as f: | |
| content = f.read() | |
| code_files[filename] = content | |
| logging.debug(f"Successfully read content of {filename}") | |
| except EntryNotFoundError: | |
| logging.warning( | |
| f"File {filename} listed but not found in repo {space_id}." | |
| ) | |
| except UnicodeDecodeError: | |
| logging.warning( | |
| f"Could not decode file {filename} from {space_id} as UTF-8. Skipping." | |
| ) | |
| except OSError as e: | |
| logging.warning(f"OS error reading file {filename} from cache: {e}") | |
| except Exception as e: | |
| logging.error( | |
| f"Unexpected error downloading or reading file {filename} from {space_id}: {e}" | |
| ) | |
| except RepositoryNotFoundError: | |
| logging.error(f"Space repository '{space_id}' not found.") | |
| return {} | |
| except Exception as e: | |
| logging.error(f"Failed to list or process files for space {space_id}: {e}") | |
| return {} | |
| logging.info( | |
| f"Successfully retrieved content for {len(code_files)} files from {space_id}." | |
| ) | |
| return code_files | |
| def extract_hf_model_ids(code_files: dict[str, str]) -> set[str]: | |
| """ | |
| Extracts potential Hugging Face model IDs mentioned in code files. | |
| Args: | |
| code_files: Dictionary of {filename: content}. | |
| Returns: | |
| A set of unique potential model IDs found. | |
| """ | |
| potential_ids = set() | |
| for filename, content in code_files.items(): | |
| # Limit search to relevant file types | |
| if filename.endswith((".py", ".json", ".yaml", ".yml", ".toml", ".md")): | |
| try: | |
| matches = HF_MODEL_ID_PATTERN.findall(content) | |
| for _, model_id in matches: | |
| # Basic validation: must contain exactly one '/' | |
| if model_id.count("/") == 1: | |
| # Avoid adding common paths that look like IDs | |
| if not any( | |
| part in model_id.lower() | |
| for part in ["http", "www", "@", " ", ".", ":"] | |
| ): # Check if '/' is only separator | |
| if len(model_id) < 100: # Avoid overly long strings | |
| potential_ids.add(model_id) | |
| except Exception as e: | |
| logging.warning(f"Regex error processing file {filename}: {e}") | |
| logging.info(f"Extracted {len(potential_ids)} potential model IDs.") | |
| # Add simple filter for very common false positives if needed | |
| # potential_ids = {id for id in potential_ids if id not in ['user/repo']} | |
| return potential_ids | |
| def get_model_descriptions(model_ids: set[str]) -> dict[str, str]: | |
| """ | |
| Fetches the README.md content (description) for a set of model IDs. | |
| Args: | |
| model_ids: A set of Hugging Face model IDs. | |
| Returns: | |
| A dictionary mapping model_id to its description string (or an error message). | |
| """ | |
| descriptions = {} | |
| if not model_ids: | |
| return descriptions | |
| logging.info(f"Fetching descriptions for {len(model_ids)} models...") | |
| for model_id in model_ids: | |
| try: | |
| # Check if the model exists first (optional but good practice) | |
| # api.model_info(model_id) | |
| # Download README.md | |
| readme_path = hf_hub_download( | |
| repo_id=model_id, | |
| filename="README.md", | |
| repo_type="model", | |
| # Add token if needing to access private/gated models - unlikely for Space analysis | |
| # use_auth_token=os.getenv("HF_TOKEN"), | |
| error_if_not_found=True, # Raise error if README doesn't exist | |
| ) | |
| with open(readme_path, "r", encoding="utf-8", errors="ignore") as f: | |
| description = f.read() | |
| descriptions[model_id] = description[:MAX_MODEL_DESC_LENGTH] + ( | |
| "... [truncated]" if len(description) > MAX_MODEL_DESC_LENGTH else "" | |
| ) | |
| logging.debug(f"Successfully fetched description for {model_id}") | |
| except RepositoryNotFoundError: | |
| logging.warning(f"Model repository '{model_id}' not found.") | |
| descriptions[model_id] = "[Model repository not found]" | |
| except EntryNotFoundError: | |
| logging.warning(f"README.md not found in model repository '{model_id}'.") | |
| descriptions[model_id] = "[README.md not found in model repository]" | |
| except Exception as e: | |
| logging.error(f"Error fetching description for model '{model_id}': {e}") | |
| descriptions[model_id] = f"[Error fetching description: {e}]" | |
| logging.info(f"Finished fetching descriptions for {len(descriptions)} models.") | |
| return descriptions | |
| def list_cached_spaces(dataset_id: str, hf_token: str | None) -> list[str]: | |
| """Lists the space IDs (owner/name) that have cached reports in the dataset repository.""" | |
| if not hf_token: | |
| logging.warning("HF Token not provided, cannot list cached spaces.") | |
| return [] | |
| try: | |
| api = HfApi(token=hf_token) | |
| # Get all filenames in the dataset repository | |
| all_files = api.list_repo_files(repo_id=dataset_id, repo_type="dataset") | |
| # Extract unique directory paths that look like owner/space_name | |
| # by checking if they contain our specific report files. | |
| space_ids = set() | |
| for f_path in all_files: | |
| # Check if the file is one of our report files | |
| if f_path.endswith(f"/{PRIVACY_FILENAME}") or f_path.endswith( | |
| f"/{SUMMARY_FILENAME}" | |
| ): | |
| # Extract the directory path part (owner/space_name) | |
| parts = f_path.split("/") | |
| if len(parts) == 3: # Expecting owner/space_name/filename.md | |
| owner_slash_space_name = "/".join(parts[:-1]) | |
| # Basic validation: owner and space name shouldn't start with '.' | |
| if not parts[0].startswith(".") and not parts[1].startswith("."): | |
| space_ids.add(owner_slash_space_name) | |
| sorted_space_ids = sorted(list(space_ids)) | |
| logging.info( | |
| f"Found {len(sorted_space_ids)} cached space reports in {dataset_id} via HfApi." | |
| ) | |
| return sorted_space_ids | |
| except RepositoryNotFoundError: | |
| logging.warning( | |
| f"Dataset {dataset_id} not found or empty when listing cached spaces." | |
| ) | |
| return [] | |
| except Exception as e: | |
| logging.error(f"Error listing cached spaces in {dataset_id} via HfApi: {e}") | |
| return [] # Return empty list on error | |
| def check_report_exists(space_id: str, dataset_id: str, hf_token: str | None) -> bool: | |
| """Checks if report files already exist in the target dataset repo using HfApi.""" | |
| print( | |
| f"[Debug Cache Check] Checking for space_id: '{space_id}' in dataset: '{dataset_id}'" | |
| ) # DEBUG | |
| if not hf_token: | |
| logging.warning("HF Token not provided, cannot check dataset cache.") | |
| print("[Debug Cache Check] No HF Token, returning False.") # DEBUG | |
| return False | |
| try: | |
| api = HfApi(token=hf_token) | |
| # List ALL files in the repo | |
| print(f"[Debug Cache Check] Listing ALL files in repo '{dataset_id}'") # DEBUG | |
| all_repo_files = api.list_repo_files(repo_id=dataset_id, repo_type="dataset") | |
| # DEBUG: Optionally print a subset if the list is huge | |
| # print(f"[Debug Cache Check] First 10 files returned by API: {all_repo_files[:10]}") | |
| # Construct the exact paths we expect for the target space_id | |
| expected_summary_path = f"{space_id}/{SUMMARY_FILENAME}" | |
| expected_privacy_path = f"{space_id}/{PRIVACY_FILENAME}" | |
| print( | |
| f"[Debug Cache Check] Expecting summary file: '{expected_summary_path}'" | |
| ) # DEBUG | |
| print( | |
| f"[Debug Cache Check] Expecting privacy file: '{expected_privacy_path}'" | |
| ) # DEBUG | |
| # Check if both expected paths exist in the full list of files | |
| summary_exists = expected_summary_path in all_repo_files | |
| privacy_exists = expected_privacy_path in all_repo_files | |
| exists = summary_exists and privacy_exists | |
| print( | |
| f"[Debug Cache Check] Summary exists in full list: {summary_exists}" | |
| ) # DEBUG | |
| print( | |
| f"[Debug Cache Check] Privacy exists in full list: {privacy_exists}" | |
| ) # DEBUG | |
| print(f"[Debug Cache Check] Overall exists check result: {exists}") # DEBUG | |
| return exists | |
| except RepositoryNotFoundError: | |
| logging.warning( | |
| f"Dataset repository {dataset_id} not found or not accessible during check." | |
| ) | |
| print( | |
| f"[Debug Cache Check] Repository {dataset_id} not found, returning False." | |
| ) # DEBUG | |
| except Exception as e: | |
| # ... (error handling remains the same) ... | |
| print(f"[Debug Cache Check] Exception caught: {type(e).__name__}: {e}") # DEBUG | |
| # Note: 404 check based on path_in_repo is no longer applicable here | |
| # We rely on RepositoryNotFoundError or general Exception | |
| logging.error( | |
| f"Error checking dataset {dataset_id} for {space_id} via HfApi: {e}" | |
| ) | |
| print("[Debug Cache Check] Other exception, returning False.") # DEBUG | |
| return False # Treat errors as cache miss | |
| def download_cached_reports( | |
| space_id: str, dataset_id: str, hf_token: str | None | |
| ) -> dict[str, str]: | |
| """Downloads cached reports from the dataset repo. Raises error on failure.""" | |
| if not hf_token: | |
| raise ValueError("HF Token required to download cached reports.") | |
| logging.info( | |
| f"Attempting to download cached reports for {space_id} from {dataset_id}..." | |
| ) | |
| reports = {} | |
| # Define paths relative to dataset root for hf_hub_download | |
| summary_repo_path = f"{space_id}/{SUMMARY_FILENAME}" | |
| privacy_repo_path = f"{space_id}/{PRIVACY_FILENAME}" | |
| try: | |
| # Download summary | |
| summary_path_local = hf_hub_download( | |
| repo_id=dataset_id, | |
| filename=summary_repo_path, | |
| repo_type="dataset", | |
| token=hf_token, | |
| ) | |
| with open(summary_path_local, "r", encoding="utf-8") as f: | |
| reports["summary"] = f.read() | |
| logging.info(f"Successfully downloaded cached summary for {space_id}.") | |
| # Download privacy report | |
| privacy_path_local = hf_hub_download( | |
| repo_id=dataset_id, | |
| filename=privacy_repo_path, | |
| repo_type="dataset", | |
| token=hf_token, | |
| ) | |
| with open(privacy_path_local, "r", encoding="utf-8") as f: | |
| reports["privacy"] = f.read() | |
| logging.info(f"Successfully downloaded cached privacy report for {space_id}.") | |
| return reports | |
| except EntryNotFoundError as e: | |
| # More specific error based on which file failed | |
| missing_file = ( | |
| summary_repo_path if summary_repo_path in str(e) else privacy_repo_path | |
| ) | |
| logging.error( | |
| f"Cache download error: Report file {missing_file} not found for {space_id} in {dataset_id}. {e}" | |
| ) | |
| raise FileNotFoundError( | |
| f"Cached report file {missing_file} not found for {space_id}" | |
| ) from e | |
| except RepositoryNotFoundError as e: | |
| logging.error(f"Cache download error: Dataset repo {dataset_id} not found. {e}") | |
| raise FileNotFoundError(f"Dataset repo {dataset_id} not found") from e | |
| except Exception as e: | |
| logging.error( | |
| f"Unexpected error downloading cached reports for {space_id} from {dataset_id}: {e}" | |
| ) | |
| raise IOError(f"Failed to download cached reports for {space_id}") from e | |
| def upload_reports_to_dataset( | |
| space_id: str, | |
| summary_report: str, | |
| detailed_report: str, | |
| dataset_id: str, | |
| hf_token: str | None, | |
| ): | |
| """Uploads the generated reports to the specified dataset repository.""" | |
| if not hf_token: | |
| logging.warning("HF Token not provided, skipping dataset report upload.") | |
| return | |
| logging.info( | |
| f"Attempting to upload reports for {space_id} to dataset {dataset_id}..." | |
| ) | |
| api = HfApi(token=hf_token) | |
| # Sanitize space_id for path safety (though HF Hub usually handles this) | |
| safe_space_id = space_id.replace("..", "") | |
| try: | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| summary_path_local = os.path.join(tmpdir, SUMMARY_FILENAME) | |
| privacy_path_local = os.path.join(tmpdir, PRIVACY_FILENAME) | |
| with open(summary_path_local, "w", encoding="utf-8") as f: | |
| f.write(summary_report) | |
| with open(privacy_path_local, "w", encoding="utf-8") as f: | |
| f.write(detailed_report) | |
| commit_message = f"Add privacy analysis reports for Space: {safe_space_id}" | |
| repo_url = api.create_repo( | |
| repo_id=dataset_id, | |
| repo_type="dataset", | |
| exist_ok=True, | |
| ) | |
| logging.info(f"Ensured dataset repo {repo_url} exists.") | |
| api.upload_file( | |
| path_or_fileobj=summary_path_local, | |
| path_in_repo=f"{safe_space_id}/{SUMMARY_FILENAME}", | |
| repo_id=dataset_id, | |
| repo_type="dataset", | |
| commit_message=commit_message, | |
| ) | |
| logging.info(f"Successfully uploaded summary report for {safe_space_id}.") | |
| api.upload_file( | |
| path_or_fileobj=privacy_path_local, | |
| path_in_repo=f"{safe_space_id}/{PRIVACY_FILENAME}", | |
| repo_id=dataset_id, | |
| repo_type="dataset", | |
| commit_message=commit_message, | |
| ) | |
| logging.info( | |
| f"Successfully uploaded detailed privacy report for {safe_space_id}." | |
| ) | |
| except Exception as e: | |
| logging.error( | |
| f"Failed to upload reports for {safe_space_id} to dataset {dataset_id}: {e}" | |
| ) | |
| # Example usage (for testing) | |
| # if __name__ == '__main__': | |
| # # Make sure HF_TOKEN is set if accessing private spaces or for higher rate limits | |
| # from dotenv import load_dotenv | |
| # load_dotenv() | |
| # # test_space = "gradio/hello_world" | |
| # test_space = "huggingface-projects/diffusers-gallery" # A more complex example | |
| # # test_space = "nonexistent/space" # Test not found | |
| # files_content = get_space_code_files(test_space) | |
| # if files_content: | |
| # print(f"\n--- Files retrieved from {test_space} ---") | |
| # for name in files_content.keys(): | |
| # print(f"- {name}") | |
| # # print("\n--- Content of app.py (first 200 chars) ---") | |
| # # print(files_content.get("app.py", "app.py not found")[:200]) | |
| # else: | |
| # print(f"Could not retrieve files from {test_space}") | |