#!/usr/bin/env python3 """ Data Preprocessing Pipeline for News Dashboard Handles preprocessing of scraped content for translation, summarization, and other operations """ import re import logging from typing import List, Dict, Any, Optional from datetime import datetime import hashlib import unicodedata from scraper_common import scraping_cancelled # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class DataPreprocessor: """ Data preprocessing pipeline for news dashboard content """ def __init__(self): self.cleaned_data = [] self.processing_stats = { 'total_processed': 0, 'successful_processing': 0, 'failed_processing': 0, 'content_issues': 0, 'metadata_issues': 0 } def preprocess_all_data(self, raw_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Main preprocessing function that processes all scraped data Args: raw_data: List of dictionaries containing scraped content Returns: List of preprocessed dictionaries ready for downstream operations """ logger.info(f"Starting preprocessing of {len(raw_data)} items") processed_data = [] for item in raw_data: # Check for cancellation during preprocessing if scraping_cancelled(): logger.warning("⚠️ Preprocessing cancelled by user") return processed_data try: processed_item = self._preprocess_single_item(item) if processed_item: processed_data.append(processed_item) self.processing_stats['successful_processing'] += 1 else: self.processing_stats['failed_processing'] += 1 except Exception as e: logger.error(f"Error processing item: {str(e)}") self.processing_stats['failed_processing'] += 1 self.processing_stats['total_processed'] += 1 logger.info(f"Preprocessing completed. Stats: {self.processing_stats}") return processed_data def _preprocess_single_item(self, item: Dict[str, Any]) -> Optional[Dict[str, Any]]: """ Preprocess a single data item Args: item: Single dictionary containing scraped content Returns: Preprocessed dictionary or None if processing failed """ try: # Debug: Log the raw item structure logger.info(f"🔍 Raw item structure for preprocessing:") logger.info(f" - Keys: {list(item.keys())}") logger.info(f" - extracted_text length: {len(item.get('extracted_text', ''))}") logger.info(f" - content length: {len(item.get('content', ''))}") # Create base processed item processed_content = self._clean_and_structure_content(item) processed_item = { 'id': self._generate_unique_id(item), 'source_metadata': self._extract_source_metadata(item), 'content': processed_content, 'metadata': self._enrich_metadata(processed_content), 'quality_metrics': self._calculate_quality_metrics(processed_content), 'processing_timestamp': datetime.now().isoformat(), 'ready_for_operations': True } # Debug: Log the processed item structure logger.debug(f"🔍 Processed item structure for {processed_item.get('id', 'unknown')}:") logger.debug(f" - Keys: {list(processed_item.keys())}") logger.debug(f" - Content keys: {list(processed_item.get('content', {}).keys())}") logger.debug(f" - Metadata keys: {list(processed_item.get('metadata', {}).keys())}") # Validate the processed item if self._validate_processed_item(processed_item): return processed_item else: logger.warning(f"Validation failed for item: {processed_item.get('id', 'unknown')}") return None except Exception as e: logger.error(f"Error preprocessing item: {str(e)}") return None def _generate_unique_id(self, item: Dict[str, Any]) -> str: """ Generate a unique identifier for the content item Args: item: Raw data item Returns: Unique identifier string """ # Handle both text articles and document data url = item.get('url', '') or item.get('file_path', '') title = item.get('title', '') # Create a hash based on URL/file_path and title for uniqueness content_string = f"{url}{title}" return hashlib.md5(content_string.encode()).hexdigest()[:12] def _extract_source_metadata(self, item: Dict[str, Any]) -> Dict[str, Any]: """ Extract and structure source metadata Args: item: Raw data item Returns: Dictionary containing source metadata """ # Handle both text articles and document data content_text = item.get('content', '') or item.get('extracted_text', '') url = item.get('url', '') or item.get('file_path', '') # Preserve original source if it exists, otherwise identify from URL original_source = item.get('source', '') source_website = self._identify_source_website(url) # Use original source if available, otherwise use source_website # If source_website is 'unknown' and we have a URL, try to get source from URL using utils if not original_source and source_website == 'unknown' and url: try: from utils import get_source_from_url original_source = get_source_from_url(url) except: pass result = { 'url': url, 'title': item.get('title', ''), 'date': item.get('date', ''), 'category': item.get('category', ''), 'source': original_source or self._map_source_website_to_name(source_website), 'source_website': source_website, 'content_type': self._identify_content_type(item), 'file_type': item.get('file_type', ''), # Preserve original file_type for CSV detection 'language': self._detect_language(content_text), 'pdf_path': item.get('pdf_path', '') or item.get('file_path', ''), 'original_structure': { 'has_pdf': bool(item.get('pdf_path') or item.get('file_path')), 'content_length': len(content_text), 'title_length': len(item.get('title', '')) } } logger.debug(f"🔍 Extracted source metadata category: '{result.get('category', '')}'") logger.debug(f"🔍 Preserved source: '{result.get('source', '')}'") return result def _clean_and_structure_content(self, item: Dict[str, Any]) -> Dict[str, Any]: """ Clean and structure the content for downstream processing Args: item: Raw data item Returns: Dictionary containing cleaned and structured content """ # Handle both text articles and document data raw_content = item.get('content', '') or item.get('extracted_text', '') # Debug: Log content extraction logger.info(f"🔍 Content extraction debug:") logger.info(f" - item.get('content', ''): '{item.get('content', '')}'") logger.info(f" - item.get('extracted_text', ''): '{item.get('extracted_text', '')[:100]}...'") logger.info(f" - raw_content length: {len(raw_content)}") # Clean the content cleaned_content = self._clean_text(raw_content) logger.info(f" - cleaned_content length: {len(cleaned_content)}") # Extract structured information structured_content = { 'raw_text': raw_content, 'cleaned_text': cleaned_content, 'text_blocks': self._split_into_blocks(cleaned_content), 'sentences': self._split_into_sentences(cleaned_content), 'summary_ready': self._prepare_for_summarization(cleaned_content), 'translation_ready': self._prepare_for_translation(cleaned_content) } return structured_content def _enrich_metadata(self, processed_content: Dict[str, Any]) -> Dict[str, Any]: """ Enrich metadata with additional information Args: processed_content: Processed content dictionary Returns: Dictionary containing enriched metadata """ # Get the cleaned text from the processed content content = processed_content.get('cleaned_text', '') return { 'word_count': len(content.split()), 'character_count': len(content), 'sentence_count': len(self._split_into_sentences(content)), 'paragraph_count': len(self._split_into_blocks(content)), 'reading_time_minutes': self._calculate_reading_time(content), 'complexity_score': self._calculate_complexity_score(content) } def _calculate_quality_metrics(self, processed_content: Dict[str, Any]) -> Dict[str, Any]: """ Calculate quality metrics for the content Args: processed_content: Processed content dictionary Returns: Dictionary containing quality metrics """ content = processed_content.get('cleaned_text', '') title = processed_content.get('title', '') return { 'content_quality': { 'completeness_score': self._calculate_completeness_score(content), 'coherence_score': self._calculate_coherence_score(content), 'relevance_score': self._calculate_relevance_score(content, title), 'readability_score': self._calculate_readability_score(content) }, 'data_quality': { 'has_title': bool(title.strip()), 'has_content': bool(content.strip()), 'has_url': bool(processed_content.get('url', '').strip()), 'content_length_adequate': len(content) > 100, 'title_length_adequate': 10 < len(title) < 200 }, 'processing_quality': { 'successfully_cleaned': bool(self._clean_text(content)), 'successfully_structured': bool(self._split_into_blocks(content)) } } def _clean_text(self, text: str) -> str: """ Clean and normalize text content Args: text: Raw text content Returns: Cleaned text content """ if not text: return "" # Remove extra whitespace and normalize text = re.sub(r'\s+', ' ', text) text = text.strip() # Remove special characters but keep punctuation text = re.sub(r'[^\w\s\.\,\!\?\;\:\-\(\)]', '', text) # Normalize unicode text = unicodedata.normalize('NFKD', text) # Remove excessive punctuation text = re.sub(r'[\.]{2,}', '.', text) text = re.sub(r'[!]{2,}', '!', text) text = re.sub(r'[?]{2,}', '?', text) return text def _split_into_blocks(self, text: str) -> List[str]: """ Split text into logical blocks (paragraphs) Args: text: Text content Returns: List of text blocks """ if not text: return [] # Split by double newlines or periods followed by space blocks = re.split(r'\n\s*\n|\.\s+(?=[A-Z])', text) return [block.strip() for block in blocks if block.strip()] def _split_into_sentences(self, text: str) -> List[str]: """ Split text into sentences Args: text: Text content Returns: List of sentences """ if not text: return [] # Simple sentence splitting sentences = re.split(r'[.!?]+', text) return [sentence.strip() for sentence in sentences if sentence.strip()] def _prepare_for_summarization(self, text: str) -> Dict[str, Any]: """ Prepare content for summarization Args: text: Text content Returns: Dictionary ready for summarization """ blocks = self._split_into_blocks(text) sentences = self._split_into_sentences(text) return { 'text': text, 'blocks': blocks, 'sentences': sentences, 'block_count': len(blocks), 'sentence_count': len(sentences), 'avg_sentence_length': sum(len(s.split()) for s in sentences) / len(sentences) if sentences else 0, 'summary_priority': self._calculate_summary_priority(text) } def _prepare_for_translation(self, text: str) -> Dict[str, Any]: """ Prepare content for translation Args: text: Text content Returns: Dictionary ready for translation """ return { 'text': text, 'language_detected': self._detect_language(text), 'translation_blocks': self._split_into_blocks(text), 'character_count': len(text), 'word_count': len(text.split()), 'translation_priority': self._calculate_translation_priority(text) } def _identify_source_website(self, url: str) -> str: """ Identify the source website from URL Args: url: URL string Returns: Website identifier """ if 'reliefweb.int' in url: return 'reliefweb' elif 'fscluster.org' in url: return 'fscluster' elif 'mopnd.govsomaliland.org' in url: return 'mopnd' elif 'nbs.gov.so' in url: return 'nbs' elif 'humdata.org' in url: return 'hdx' elif 'logcluster.org' in url: return 'logcluster' elif 'fsnau.org' in url: return 'fsnau' elif 'fews.net' in url: return 'fews' elif 'icpac.net' in url: if 'seasonal-forecast' in url.lower(): return 'icpac_seasonal_forecast' else: return 'icpac' elif 'faoswalim.org' in url: return 'faoswalim' else: return 'unknown' def _map_source_website_to_name(self, source_website: str) -> str: """ Map source website identifier to proper source name Args: source_website: Website identifier (lowercase) Returns: Proper source name """ mapping = { 'reliefweb': 'ReliefWeb', 'fscluster': 'FS Cluster', 'mopnd': 'MOPND Somaliland', 'nbs': 'NBS Somalia', 'hdx': 'HDX Humanitarian Data Exchange', 'logcluster': 'LogCluster', 'fsnau': 'FSNau - Food Security and Nutrition Analysis Unit', 'fews': 'FEWS NET', 'icpac': 'ICPAC', 'icpac_seasonal_forecast': 'ICPAC - IGAD Climate Prediction and Applications Centre - Seasonal Forecast', 'faoswalim': 'FAO SWALIM' } return mapping.get(source_website, 'Unknown') def _identify_content_type(self, item: Dict[str, Any]) -> str: """ Identify the type of content Args: item: Raw data item Returns: Content type identifier """ # Handle document data with file_type field if item.get('file_type'): file_type = item.get('file_type', '').lower() if 'pdf' in file_type: return 'pdf_document' elif 'doc' in file_type: return 'word_document' elif 'csv' in file_type: return 'csv_data' else: return f'{file_type}_document' # Handle legacy pdf_path field elif item.get('pdf_path') or item.get('file_path'): return 'pdf_document' # Handle URL-based content type detection url = item.get('url', '') or item.get('file_path', '') if 'article' in url.lower(): return 'article' elif 'publication' in url.lower(): return 'publication' elif 'journal' in url.lower(): return 'journal' elif 'event' in url.lower(): return 'event' else: return 'general' def _detect_language(self, text: str) -> str: """ Detect language of the text (simplified) Args: text: Text content Returns: Language code """ if not text: return 'unknown' # Simple language detection based on common words somali_words = ['somalia', 'somaliland', 'puntland', 'mogadishu', 'hargeisa'] english_words = ['the', 'and', 'of', 'in', 'to', 'for', 'with', 'on', 'at'] text_lower = text.lower() somali_count = sum(1 for word in somali_words if word in text_lower) english_count = sum(1 for word in english_words if word in text_lower) if somali_count > english_count: return 'so' elif english_count > somali_count: return 'en' else: return 'unknown' def _calculate_reading_time(self, text: str) -> float: """ Calculate estimated reading time in minutes Args: text: Text content Returns: Reading time in minutes """ word_count = len(text.split()) return round(word_count / 200, 1) # Average reading speed: 200 words per minute def _calculate_complexity_score(self, text: str) -> float: """ Calculate text complexity score Args: text: Text content Returns: Complexity score (0-1) """ if not text: return 0.0 sentences = self._split_into_sentences(text) if not sentences: return 0.0 avg_sentence_length = sum(len(s.split()) for s in sentences) / len(sentences) long_words = sum(1 for word in text.split() if len(word) > 6) total_words = len(text.split()) complexity = (avg_sentence_length / 20) + (long_words / total_words if total_words > 0 else 0) return min(complexity, 1.0) def _calculate_completeness_score(self, content: str) -> float: """ Calculate content completeness score Args: content: Text content Returns: Completeness score (0-1) """ if not content: return 0.0 score = 0.0 # Length check if len(content) > 100: score += 0.3 # Sentence count check sentences = self._split_into_sentences(content) if len(sentences) > 3: score += 0.3 # Paragraph count check blocks = self._split_into_blocks(content) if len(blocks) > 1: score += 0.2 # Basic content check if len(content.split()) > 10: score += 0.2 return min(score, 1.0) def _calculate_coherence_score(self, content: str) -> float: """ Calculate content coherence score Args: content: Text content Returns: Coherence score (0-1) """ if not content: return 0.0 # Simple coherence based on sentence structure sentences = self._split_into_sentences(content) if len(sentences) < 2: return 0.5 # Check for proper sentence endings proper_endings = sum(1 for s in sentences if s.endswith(('.', '!', '?'))) coherence = proper_endings / len(sentences) return min(coherence, 1.0) def _calculate_relevance_score(self, content: str, title: str) -> float: """ Calculate content relevance score Args: content: Text content title: Title text Returns: Relevance score (0-1) """ if not content or not title: return 0.0 # Check if title words appear in content title_words = set(title.lower().split()) content_words = set(content.lower().split()) overlap = len(title_words.intersection(content_words)) relevance = overlap / len(title_words) if title_words else 0.0 return min(relevance, 1.0) def _calculate_readability_score(self, content: str) -> float: """ Calculate readability score Args: content: Text content Returns: Readability score (0-1) """ if not content: return 0.0 sentences = self._split_into_sentences(content) words = content.split() if not sentences or not words: return 0.0 # Simple readability based on sentence length and word length avg_sentence_length = len(words) / len(sentences) avg_word_length = sum(len(word) for word in words) / len(words) # Normalize to 0-1 scale readability = 1.0 - (avg_sentence_length / 50) - (avg_word_length / 10) return max(0.0, min(readability, 1.0)) def _calculate_summary_priority(self, text: str) -> str: """ Calculate summary priority Args: text: Text content Returns: Priority level """ word_count = len(text.split()) if word_count > 1000: return 'high' elif word_count > 500: return 'medium' else: return 'low' def _calculate_translation_priority(self, text: str) -> str: """ Calculate translation priority Args: text: Text content Returns: Priority level """ # Check for important keywords important_keywords = ['emergency', 'crisis', 'disaster', 'flood', 'drought', 'food', 'security'] text_lower = text.lower() if any(keyword in text_lower for keyword in important_keywords): return 'high' elif len(text) > 500: return 'medium' else: return 'low' def _validate_processed_item(self, item: Dict[str, Any]) -> bool: """ Validate processed item Args: item: Processed item Returns: True if valid, False otherwise """ required_fields = ['id', 'source_metadata', 'content', 'metadata'] # Debug: Check which fields are missing missing_fields = [] for field in required_fields: if field not in item: missing_fields.append(field) if missing_fields: logger.warning(f"❌ Missing required fields: {missing_fields}") logger.warning(f"📋 Available fields: {list(item.keys())}") return False # Check content quality content = item.get('content', {}) cleaned_text = content.get('cleaned_text', '') if not cleaned_text: logger.warning(f"❌ No cleaned_text found in content") logger.warning(f"📋 Content structure: {content}") return False # Check metadata quality metadata = item.get('metadata', {}) word_count = metadata.get('word_count', 0) if word_count < 10: logger.warning(f"❌ Word count too low: {word_count} (minimum: 10)") logger.warning(f"📋 Metadata: {metadata}") return False logger.debug(f"✅ Validation passed for item {item.get('id', 'unknown')}") return True def get_processing_stats(self) -> Dict[str, Any]: """ Get processing statistics Returns: Dictionary containing processing statistics """ return self.processing_stats.copy() def preprocess_scraped_data(raw_data: List[Dict[str, Any]], output_path: Optional[str] = None) -> List[Dict[str, Any]]: """ Convenience function to preprocess scraped data Args: raw_data: List of raw scraped data output_path: Optional output file path (deprecated - not used) Returns: List of preprocessed data """ preprocessor = DataPreprocessor() processed_data = preprocessor.preprocess_all_data(raw_data) return processed_data if __name__ == "__main__": # Example usage sample_data = [ { 'title': 'Sample Article', 'content': 'This is a sample article about water management in Somalia.', 'url': 'https://example.com/article1', 'date': '2024-01-01' } ] processed = preprocess_scraped_data(sample_data) print(f"Processed {len(processed)} items")