diff --git "a/document_scraper.py" "b/document_scraper.py"
new file mode 100644--- /dev/null
+++ "b/document_scraper.py"
@@ -0,0 +1,2451 @@
+"""
+Document Scraper - Handles PDF and document processing
+"""
+
+import asyncio
+import json
+import logging
+import os
+import hashlib
+import tempfile
+import requests
+import urllib3
+from datetime import datetime
+from typing import List, Dict, Any
+from urllib.parse import urlparse, urlunparse, unquote
+
+# Import common functions from scraper_common
+from scraper_common import (
+ WEBSITE_CONFIG, MAX_PDF_LIMIT, MAX_ARTICLE_LIMIT, MAX_PAGE_LIMIT,
+ ensure_archive_directory, convert_to_absolute_url,
+ set_scraping_cancelled, scraping_cancelled, force_close_browser,
+ reset_global_pdf_count, increment_global_pdf_count, get_global_pdf_count, is_pdf_limit_reached,
+ get_pdf_websites
+)
+
+# Import date filtering utilities
+from date_filter import is_date_in_range, parse_date_input, standardize_date
+
+# Suppress SSL warnings
+urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
+
+# Configure logging
+logging.basicConfig(
+ level=logging.INFO,
+ format='%(asctime)s - %(name)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s'
+)
+logger = logging.getLogger(__name__)
+
+
+def construct_navigation_url(base_url: str, nav_addition: str) -> str:
+ """
+ Construct navigation URL by properly handling trailing slashes and query parameters
+ """
+ # Remove trailing slash from base URL if it exists
+ if base_url.endswith('/'):
+ base_url = base_url.rstrip('/')
+
+ # Check if nav_addition starts with / or ?
+ if nav_addition.startswith('/'):
+ # Direct path addition
+ return base_url + nav_addition
+ elif nav_addition.startswith('?'):
+ # Query parameter addition
+ return base_url + nav_addition
+ else:
+ # Default: add as path
+ return base_url + '/' + nav_addition
+
+# Global variables for document processing
+mopnd_article_dates = {}
+mopnd_article_titles = {}
+
+def clear_mopnd_cache():
+ """Clear MOPND article cache when starting a new scraping session"""
+ global mopnd_article_dates, mopnd_article_titles
+ mopnd_article_dates.clear()
+ mopnd_article_titles.clear()
+ logger.info("๐งน Cleared MOPND article cache")
+
+def get_pdf_hash(pdf_url: str) -> str:
+ """Generate a hash for the PDF URL to use as cache key"""
+ return hashlib.md5(pdf_url.encode()).hexdigest()
+
+def is_pdf_archived(pdf_url: str, source: str) -> bool:
+ """Check if PDF is already archived"""
+ ensure_archive_directory()
+ hash_key = get_pdf_hash(pdf_url)
+ archive_dir = f"archive/{source}"
+ date_folder = datetime.now().strftime("%Y-%m-%d")
+ archive_path = f"{archive_dir}/{date_folder}"
+
+ if os.path.exists(archive_path):
+ for file in os.listdir(archive_path):
+ if file.startswith(hash_key):
+ return True
+ return False
+
+def get_archived_pdf_path(pdf_url: str, source: str) -> str:
+ """Get the archived PDF file path"""
+ ensure_archive_directory()
+ hash_key = get_pdf_hash(pdf_url)
+ archive_dir = f"archive/{source}"
+ date_folder = datetime.now().strftime("%Y-%m-%d")
+ archive_path = f"{archive_dir}/{date_folder}"
+
+ if os.path.exists(archive_path):
+ for file in os.listdir(archive_path):
+ if file.startswith(hash_key):
+ return os.path.join(archive_path, file)
+ return None
+
+def archive_pdf(pdf_url: str, content: bytes, source: str) -> str:
+ """Archive PDF content and return the local file path"""
+ logger.info(f"๐พ Starting PDF archiving process...")
+ ensure_archive_directory()
+
+ # Create source-specific archive directory
+ archive_dir = f"archive/{source}"
+ date_folder = datetime.now().strftime("%Y-%m-%d")
+ archive_path = f"{archive_dir}/{date_folder}"
+
+ # Create directory if it doesn't exist
+ os.makedirs(archive_path, exist_ok=True)
+
+ # Generate unique filename using hash
+ hash_key = get_pdf_hash(pdf_url)
+ filename = f"{hash_key}.pdf"
+ file_path = os.path.join(archive_path, filename)
+
+ # Save PDF content
+ with open(file_path, 'wb') as f:
+ f.write(content)
+
+ logger.info(f"๐ PDF archived to: {file_path}")
+
+ # Update archive index
+ update_archive_index(pdf_url, file_path, source)
+
+ return file_path
+
+def archive_file(file_url: str, content: bytes, source: str, file_extension: str = "csv") -> str:
+ """Archive file content (CSV, etc.) and return the local file path"""
+ logger.info(f"๐พ Starting file archiving process for {file_extension.upper()}...")
+ ensure_archive_directory()
+
+ # Create source-specific archive directory
+ archive_dir = f"archive/{source}"
+ date_folder = datetime.now().strftime("%Y-%m-%d")
+ archive_path = f"{archive_dir}/{date_folder}"
+
+ # Create directory if it doesn't exist
+ os.makedirs(archive_path, exist_ok=True)
+
+ # Generate unique filename using hash
+ hash_key = get_pdf_hash(file_url)
+ filename = f"{hash_key}.{file_extension}"
+ file_path = os.path.join(archive_path, filename)
+
+ # Save file content
+ with open(file_path, 'wb') as f:
+ f.write(content)
+
+ logger.info(f"๐ File archived to: {file_path}")
+
+ # Update archive index
+ update_archive_index(file_url, file_path, source)
+
+ return file_path
+
+def update_archive_index(pdf_url: str, local_path: str, source: str):
+ """Update the archive index with PDF information"""
+ ensure_archive_directory()
+ index_file = f"archive/{source}/index.json"
+
+ # Load existing index or create new one
+ if os.path.exists(index_file):
+ try:
+ with open(index_file, 'r') as f:
+ index = json.load(f)
+ except:
+ index = {}
+ else:
+ index = {}
+
+ # Add new entry
+ hash_key = get_pdf_hash(pdf_url)
+ index[hash_key] = {
+ "url": pdf_url,
+ "local_path": local_path,
+ "source": source,
+ "archived_date": datetime.now().isoformat()
+ }
+
+ # Save updated index
+ with open(index_file, 'w') as f:
+ json.dump(index, f, indent=2)
+
+def download_and_save_pdf(pdf_url: str, source: str = "unknown") -> dict:
+ """
+ Download PDF and save it to archive, return metadata
+ """
+ try:
+ logger.info(f"โฌ๏ธ Downloading PDF: {pdf_url}")
+ logger.info(f"๐ Source: {source}")
+
+ # Check if PDF is already archived
+ if is_pdf_archived(pdf_url, source):
+ logger.info(f"โ
PDF already archived: {pdf_url}")
+ cached_path = get_archived_pdf_path(pdf_url, source)
+ return {
+ "success": True,
+ "path": cached_path,
+ "size": os.path.getsize(cached_path),
+ "message": "PDF already archived"
+ }
+
+ # Create headers to mimic a browser request
+ parsed_url = urlparse(pdf_url)
+ base_domain = f"{parsed_url.scheme}://{parsed_url.netloc}"
+
+ headers = {
+ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
+ "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",
+ "Accept-Language": "en-US,en;q=0.9",
+ "Connection": "keep-alive",
+ "Referer": base_domain
+ }
+
+ logger.info(f"๐ Using base domain as referer: {base_domain}")
+
+ # Try direct download with headers first
+ try:
+ session = requests.Session()
+ # Disable SSL verification for problematic certificates
+ session.verify = False
+
+ # First, visit the domain homepage to get cookies
+ session.get(base_domain, headers=headers, timeout=30, verify=False)
+ logger.info(f"๐ช Visited domain homepage to gather cookies")
+
+ # Then try to download the PDF with proper headers
+ response = session.get(pdf_url, headers=headers, timeout=30, verify=False)
+ response.raise_for_status()
+ logger.info(f"โ
PDF downloaded successfully. Size: {len(response.content)} bytes")
+ except Exception as e:
+ logger.error(f"โ Error downloading PDF: {str(e)}")
+ raise
+
+ # Archive the PDF
+ archived_path = archive_pdf(pdf_url, response.content, source)
+ logger.info(f"๐ PDF archived to: {archived_path}")
+
+ return {
+ "success": True,
+ "path": archived_path,
+ "size": len(response.content),
+ "message": "PDF downloaded and archived successfully"
+ }
+ except Exception as e:
+ # Direct download failed, return error without fallback
+ logger.error(f"โ PDF download failed for {pdf_url}: {str(e)}")
+ return {
+ "success": False,
+ "path": None,
+ "size": 0,
+ "message": f"Error downloading PDF: {str(e)}"
+ }
+
+def download_and_save_file(file_url: str, source: str = "unknown", file_type: str = "csv") -> dict:
+ """
+ Download file (CSV, etc.) and save it to archive, return metadata
+ """
+ try:
+ logger.info(f"โฌ๏ธ Downloading {file_type.upper()}: {file_url}")
+ logger.info(f"๐ Source: {source}")
+
+ # Determine file extension
+ file_extension = file_type.lower()
+ if file_extension not in ["csv", "xlsx", "xls", "png", "jpg", "jpeg", "gif", "webp"]:
+ # Try to determine from URL if not in known types
+ if file_url.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp')):
+ file_extension = file_url.lower().split('.')[-1]
+ else:
+ file_extension = "csv" # Default to CSV
+
+ # Check if file is already archived (using same hash mechanism as PDFs)
+ if is_pdf_archived(file_url, source):
+ logger.info(f"โ
File already archived: {file_url}")
+ cached_path = get_archived_pdf_path(file_url, source)
+ # Check if the cached file has the right extension
+ if cached_path and os.path.exists(cached_path):
+ return {
+ "success": True,
+ "path": cached_path,
+ "size": os.path.getsize(cached_path),
+ "file_type": file_type,
+ "message": "File already archived"
+ }
+
+ # Create headers to mimic a browser request
+ parsed_url = urlparse(file_url)
+ base_domain = f"{parsed_url.scheme}://{parsed_url.netloc}"
+
+ headers = {
+ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
+ "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",
+ "Accept-Language": "en-US,en;q=0.9",
+ "Connection": "keep-alive",
+ "Referer": base_domain
+ }
+
+ logger.info(f"๐ Using base domain as referer: {base_domain}")
+
+ # Try direct download with headers first
+ try:
+ session = requests.Session()
+ # Disable SSL verification for problematic certificates
+ session.verify = False
+
+ # First, visit the domain homepage to get cookies
+ session.get(base_domain, headers=headers, timeout=30, verify=False)
+ logger.info(f"๐ช Visited domain homepage to gather cookies")
+
+ # Then try to download the file with proper headers
+ response = session.get(file_url, headers=headers, timeout=30, verify=False)
+ response.raise_for_status()
+ logger.info(f"โ
{file_type.upper()} downloaded successfully. Size: {len(response.content)} bytes")
+ except Exception as e:
+ logger.error(f"โ Error downloading {file_type.upper()}: {str(e)}")
+ raise
+
+ # Archive the file
+ archived_path = archive_file(file_url, response.content, source, file_extension)
+ logger.info(f"๐ {file_type.upper()} archived to: {archived_path}")
+
+ return {
+ "success": True,
+ "path": archived_path,
+ "size": len(response.content),
+ "file_type": file_type,
+ "message": f"{file_type.upper()} downloaded and archived successfully"
+ }
+ except Exception as e:
+ # Direct download failed, return error without fallback
+ logger.error(f"โ {file_type.upper()} download failed for {file_url}: {str(e)}")
+ return {
+ "success": False,
+ "path": None,
+ "size": 0,
+ "file_type": file_type,
+ "message": f"Error downloading {file_type.upper()}: {str(e)}"
+ }
+
+def get_website_type_from_source(source: str) -> str:
+ """
+ Map source name to website type for config lookup
+ """
+ source_to_type = {
+ "FS Cluster": "fscluster",
+ "ReliefWeb": "reliefweb",
+ "NBS Somalia": "nbs",
+ "HDX": "hdx",
+ "HDX Humanitarian Data Exchange": "hdx",
+ "LogCluster": "logcluster",
+ "FSNau": "fsnau",
+ "FSNau - Food Security and Nutrition Analysis Unit": "fsnau",
+ "FSNau Publications": "fsnau_publications",
+ "FEWS NET": "fews",
+ "FEWS NET - Famine Early Warning Systems Network": "fews",
+ "ICPAC": "icpac",
+ "ICPAC - IGAD Climate Prediction and Applications Centre": "icpac",
+ "ICPAC - IGAD Climate Prediction and Applications Centre - Seasonal Forecast": "icpac_seasonal_forecast",
+ "FAO SWALIM": "faoswalim",
+ "FAO SWALIM Publications": "faoswalim_publications",
+ "FAO SWALIM Journals": "faoswalim_journals",
+ "FAO SWALIM Events": "faoswalim_events",
+ "FAO SWALIM Articles": "faoswalim_articles",
+ "FAO SWALIM Flood Watch": "faoswalim_flood_watch",
+ "FAO SWALIM Water Publications": "faoswalim_water_publications",
+ "MOPND Somaliland": "mopnd",
+ "Copernicus Drought Observatory": "copernicus_drought",
+ "fscluster": "fscluster",
+ "reliefweb": "reliefweb",
+ "NBS": "nbs",
+ "HDX": "hdx",
+ "LogCluster": "logcluster",
+ "FSNau": "fsnau",
+ "FSNau Publications": "fsnau_publications",
+ "FEWS NET": "fews",
+ "ICPAC": "icpac",
+ "FAO SWALIM": "faoswalim"
+ }
+ return source_to_type.get(source, "fscluster") # Default fallback
+
+
+def extract_pdf_text(pdf_url: str, source: str = "unknown") -> str:
+ """
+ Extract text content from archived PDF using multiple methods
+ """
+ try:
+ logger.info(f"๐ Starting PDF text extraction for URL: {pdf_url}")
+ logger.info(f"๐ Source: {source}")
+
+ # Check if URL is relative and convert to absolute URL
+ parsed_url = urlparse(pdf_url)
+
+ # If the URL is relative (no scheme/netloc), we need to construct complete URL
+ if not parsed_url.scheme and pdf_url.startswith('/'):
+ # Get website type from source and lookup base_url from config
+ website_type = get_website_type_from_source(source)
+ config = WEBSITE_CONFIG.get(website_type, {})
+ base_url = config.get('base_url', 'https://fscluster.org') # Default fallback
+
+ logger.info(f"๐ Using base_url from config for {website_type}: {base_url}")
+
+ # Construct complete URL
+ complete_url = f"{base_url}{pdf_url}"
+ logger.info(f"๐ Converted relative URL {pdf_url} to absolute URL: {complete_url}")
+ pdf_url = complete_url
+
+ # Get archived PDF path
+ if is_pdf_archived(pdf_url, source):
+ cached_path = get_archived_pdf_path(pdf_url, source)
+ logger.info(f"๐ Using archived PDF: {cached_path}")
+ result = extract_text_from_pdf_file(cached_path)
+ logger.info(f"๐ Extracted text length: {len(result)} characters")
+
+ if not result.strip():
+ logger.warning("โ ๏ธ No text extracted from PDF - might be image-based or corrupted")
+ else:
+ logger.info(f"โ
Successfully extracted text from PDF")
+
+ return result
+ else:
+ # Try to download the PDF first if not in archive
+ logger.info(f"โ PDF not found in archive: {pdf_url}")
+ logger.info(f"โฌ๏ธ Attempting to download PDF now...")
+
+ # Attempt the download
+ download_result = download_and_save_pdf(pdf_url, source)
+ if download_result["success"]:
+ logger.info(f"โ
Successfully downloaded PDF: {download_result['path']}")
+ # Now extract text from the newly downloaded PDF
+ result = extract_text_from_pdf_file(download_result["path"])
+ return result
+ else:
+ logger.error(f"โ Failed to download PDF: {download_result['message']}")
+
+ # Special failure message for fscluster
+ if source.lower() == "fscluster" and "403" in download_result["message"]:
+ return f"PDF download blocked by fscluster.org (403 Forbidden). Try visiting the document page first in your browser before scraping, or use authenticated session cookies: {pdf_url}"
+ else:
+ return f"PDF not found in archive and download failed: {pdf_url}"
+
+ except Exception as e:
+ logger.error(f"โ Error extracting PDF text from {pdf_url}: {str(e)}")
+ return f"Error extracting PDF: {str(e)}"
+
+def extract_text_from_pdf_file(pdf_file_or_path):
+ """
+ Extract text from PDF using multiple methods for better compatibility
+ """
+ text_content = ""
+
+ try:
+ logger.info(f"๐ Starting PDF text extraction...")
+
+ # Method 1: Try pypdf first (most reliable for text-based PDFs)
+ try:
+ logger.info(f"๐ Trying pypdf extraction...")
+ import pypdf
+
+ if isinstance(pdf_file_or_path, str):
+ # File path
+ logger.info(f"๐ Reading from file path: {pdf_file_or_path}")
+ with open(pdf_file_or_path, 'rb') as file:
+ pdf_reader = pypdf.PdfReader(file)
+ logger.info(f"๐ PDF has {len(pdf_reader.pages)} pages")
+ for i, page in enumerate(pdf_reader.pages):
+ page_text = page.extract_text()
+ if page_text:
+ text_content += page_text + "\n"
+ else:
+ # BytesIO objects
+ logger.info(f"๐ Reading from BytesIO object")
+ pdf_reader = pypdf.PdfReader(pdf_file_or_path)
+ logger.info(f"๐ PDF has {len(pdf_reader.pages)} pages")
+ for i, page in enumerate(pdf_reader.pages):
+ page_text = page.extract_text()
+ if page_text:
+ text_content += page_text + "\n"
+
+ if text_content.strip():
+ logger.info(f"โ
Successfully extracted text using pypdf: {len(text_content)} characters")
+ return text_content.strip()
+ else:
+ logger.warning("โ ๏ธ pypdf extracted no text")
+ except Exception as e:
+ logger.warning(f"โ ๏ธ pypdf extraction failed: {str(e)}")
+
+ # Method 2: Try pdfplumber (better for complex layouts)
+ try:
+ logger.info(f"๐ Trying pdfplumber extraction...")
+ import pdfplumber
+
+ if isinstance(pdf_file_or_path, str):
+ with pdfplumber.open(pdf_file_or_path) as pdf:
+ logger.info(f"๐ PDF has {len(pdf.pages)} pages")
+ for i, page in enumerate(pdf.pages):
+ page_text = page.extract_text()
+ if page_text:
+ text_content += page_text + "\n"
+ else:
+ # For BytesIO objects, we need to save to temp file first
+ with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as temp_file:
+ temp_file.write(pdf_file_or_path.getvalue())
+ temp_file.flush()
+
+ with pdfplumber.open(temp_file.name) as pdf:
+ logger.info(f"๐ PDF has {len(pdf.pages)} pages")
+ for i, page in enumerate(pdf.pages):
+ page_text = page.extract_text()
+ if page_text:
+ text_content += page_text + "\n"
+
+ # Clean up temp file
+ os.unlink(temp_file.name)
+ logger.info(f"๐๏ธ Temp file cleaned up")
+
+ if text_content.strip():
+ logger.info(f"โ
Successfully extracted text using pdfplumber: {len(text_content)} characters")
+ return text_content.strip()
+ else:
+ logger.warning("โ ๏ธ pdfplumber extracted no text")
+ except ImportError:
+ logger.warning("โ ๏ธ pdfplumber not available")
+ except Exception as e:
+ logger.warning(f"โ ๏ธ pdfplumber extraction failed: {str(e)}")
+
+ # Method 3: Try PyMuPDF (fitz) for better text extraction
+ try:
+ logger.info(f"๐ Trying PyMuPDF extraction...")
+ import fitz # PyMuPDF
+
+ if isinstance(pdf_file_or_path, str):
+ doc = fitz.open(pdf_file_or_path)
+ else:
+ doc = fitz.open(stream=pdf_file_or_path.getvalue(), filetype="pdf")
+
+ logger.info(f"๐ PDF has {doc.page_count} pages")
+ for page_num in range(doc.page_count):
+ page = doc.load_page(page_num)
+ page_text = page.get_text()
+ if page_text:
+ text_content += page_text + "\n"
+
+ doc.close()
+
+ if text_content.strip():
+ logger.info(f"โ
Successfully extracted text using PyMuPDF: {len(text_content)} characters")
+ return text_content.strip()
+ else:
+ logger.warning("โ ๏ธ PyMuPDF extracted no text")
+ except ImportError:
+ logger.warning("โ ๏ธ PyMuPDF not available")
+ except Exception as e:
+ logger.warning(f"โ ๏ธ PyMuPDF extraction failed: {str(e)}")
+
+ # Try one more advanced method for text-within-images using OCR
+ # This is especially helpful for LogCluster PDFs which often have text embedded in images
+ if not text_content.strip() or len(text_content.strip()) < 500: # If no text or very little text extracted
+ try:
+ logger.info(f"๐ Trying OCR extraction as last resort...")
+ import pytesseract
+ from PIL import Image
+ from pdf2image import convert_from_path, convert_from_bytes
+
+ if isinstance(pdf_file_or_path, str):
+ # Convert PDF to images
+ images = convert_from_path(pdf_file_or_path, dpi=300)
+ else:
+ # For BytesIO objects
+ images = convert_from_bytes(pdf_file_or_path.getvalue(), dpi=300)
+
+ logger.info(f"๐ผ๏ธ Converted PDF to {len(images)} images for OCR")
+
+ for i, image in enumerate(images):
+ # Extract text using OCR
+ page_text = pytesseract.image_to_string(image, lang='eng')
+ if page_text.strip():
+ text_content += f"Page {i+1} (OCR):\n{page_text}\n"
+ logger.info(f"๐ OCR extracted {len(page_text)} characters from page {i+1}")
+
+ if text_content.strip():
+ logger.info(f"โ
Successfully extracted text using OCR: {len(text_content)} characters")
+ return text_content.strip()
+ else:
+ logger.warning("โ ๏ธ OCR extracted no text")
+ except ImportError:
+ logger.warning("โ ๏ธ OCR libraries not available (pytesseract, pdf2image)")
+ except Exception as e:
+ logger.warning(f"โ OCR extraction failed: {str(e)}")
+
+ # If we got some text content from earlier methods, return it even if it's partial
+ if text_content.strip():
+ logger.info(f"โ ๏ธ Returning partial text extraction ({len(text_content.strip())} characters)")
+ return text_content.strip()
+
+ # If all methods fail, return a message
+ logger.warning("โ All PDF extraction methods failed")
+ return "PDF text extraction failed - document may be image-based or corrupted"
+
+ except Exception as e:
+ logger.error(f"โ Error in PDF text extraction: {str(e)}")
+ return f"PDF text extraction failed: {str(e)}"
+
+async def download_all_pdfs_from_page(page, url: str, config: dict, source: str, start_date: str = None, end_date: str = None) -> List[dict]:
+ """
+ Download all PDFs from multiple pages with pagination support
+ Supports both approaches:
+ 1. Direct PDF discovery (pdf_links only)
+ 2. Page links first, then PDF discovery (page_links + pdf_links)
+ """
+ try:
+ logger.info(f"๐ Starting PDF download from page: {url}")
+ logger.info(f"๐ Source: {source}")
+
+ # Clear MOPND cache if this is a MOPND scraping session
+ if source == "mopnd":
+ clear_mopnd_cache()
+
+ # Reset global PDF counter at the start of processing
+ reset_global_pdf_count()
+ logger.info(f"๐ Reset global PDF counter. Limit: {MAX_PDF_LIMIT}")
+
+ # Check for special table extraction mode
+ extract_table_as_csv = config.get("extract_table_as_csv", False)
+ if extract_table_as_csv:
+ logger.info("๐ Using table extraction mode: Extract table data and convert to CSV")
+ return await extract_table_as_csv_file(page, url, config, source, start_date, end_date)
+
+ # Determine which approach to use
+ page_links_selector = config.get("page_links")
+ pdf_links_selector = config.get("pdf_links")
+ file_links_selector = config.get("file_links")
+
+ # Debug logging
+ logger.debug(f"๐ Config check for source '{source}': page_links={page_links_selector}, pdf_links={pdf_links_selector}, file_links={file_links_selector}")
+
+ # If page_links is configured and not null/empty, use Approach 2
+ # This allows us to navigate to individual pages and extract PDFs from each
+ if page_links_selector and pdf_links_selector:
+ # Approach 2: Page links first, then PDF discovery
+ logger.info("๐ Using Approach 2: Page links first, then PDF discovery")
+ return await download_pdfs_via_page_links(page, url, config, source, start_date, end_date)
+ elif page_links_selector and file_links_selector:
+ # Approach 2: Page links first, then file discovery
+ logger.info("๐ Using Approach 2: Page links first, then file discovery")
+ return await download_pdfs_via_page_links(page, url, config, source, start_date, end_date)
+ elif pdf_links_selector or file_links_selector:
+ # Approach 1: Direct PDF/file discovery
+ logger.info("๐ Using Approach 1: Direct PDF/file discovery")
+ return await download_pdfs_direct(page, url, config, source, start_date, end_date)
+ else:
+ logger.error("โ No pdf_links, file_links, or page_links configured")
+ return []
+
+ except Exception as e:
+ logger.error(f"โ Error downloading PDFs from pages: {str(e)}")
+ return []
+
+
+async def extract_table_as_csv_file(page, url: str, config: dict, source: str, start_date: str = None, end_date: str = None) -> List[dict]:
+ """
+ Special function to extract table data and convert to CSV
+ """
+ try:
+ logger.info(f"๐ Starting table extraction from page: {url}")
+ logger.info(f"๐ Source: {source}")
+
+ # Navigate to the page
+ await page.goto(url, wait_until="domcontentloaded", timeout=30000)
+
+ # Get content selector (should be "td, th" for table cells)
+ content_selector = config.get("content")
+ if not content_selector:
+ logger.error("โ No content selector configured for table extraction")
+ return []
+
+ logger.info(f"๐ Extracting table data using selector: {content_selector}")
+
+ # Extract all table cells (td and th)
+ cell_elements = await page.query_selector_all(content_selector)
+ logger.info(f"๐ Found {len(cell_elements)} table cells")
+
+ if not cell_elements:
+ logger.warning("โ ๏ธ No table cells found")
+ return []
+
+ # Extract text from all cells
+ cells_data = []
+ for element in cell_elements:
+ try:
+ cell_text = await element.text_content()
+ if cell_text:
+ cells_data.append(cell_text.strip())
+ else:
+ cells_data.append("")
+ except Exception as e:
+ logger.debug(f"โ ๏ธ Error extracting cell text: {str(e)}")
+ cells_data.append("")
+
+ # Try to find the table structure to organize data into rows
+ # First, try to find all table rows
+ table_rows = []
+ try:
+ # Try to find table rows
+ row_elements = await page.query_selector_all("tr")
+ if row_elements:
+ logger.info(f"๐ Found {len(row_elements)} table rows")
+ for row_element in row_elements:
+ row_cells = await row_element.query_selector_all("td, th")
+ row_data = []
+ for cell in row_cells:
+ try:
+ cell_text = await cell.text_content()
+ row_data.append(cell_text.strip() if cell_text else "")
+ except:
+ row_data.append("")
+ if row_data: # Only add non-empty rows
+ table_rows.append(row_data)
+ except Exception as e:
+ logger.warning(f"โ ๏ธ Could not extract table rows: {str(e)}")
+ # Fallback: organize cells into rows based on a reasonable assumption
+ # If we can't find rows, we'll create a single row with all cells
+ if cells_data:
+ table_rows = [cells_data]
+
+ if not table_rows:
+ logger.warning("โ ๏ธ No table rows extracted")
+ return []
+
+ # Convert to CSV format
+ import csv
+ import io
+
+ csv_buffer = io.StringIO()
+ csv_writer = csv.writer(csv_buffer)
+
+ # Write all rows to CSV
+ for row in table_rows:
+ csv_writer.writerow(row)
+
+ csv_content = csv_buffer.getvalue()
+ csv_buffer.close()
+
+ logger.info(f"๐ Generated CSV content: {len(csv_content)} characters, {len(table_rows)} rows")
+
+ # Generate filename
+ from datetime import datetime
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+ filename = f"river_levels_{timestamp}.csv"
+
+ # Save CSV file to archive
+ csv_bytes = csv_content.encode('utf-8')
+ csv_file_path = archive_file(url, csv_bytes, source, "csv")
+
+ logger.info(f"๐ CSV file saved to: {csv_file_path}")
+
+ # Create document entry
+ document = {
+ "url": url,
+ "local_path": csv_file_path,
+ "size": len(csv_bytes),
+ "title": f"River Levels Data - {datetime.now().strftime('%Y-%m-%d')}",
+ "source": source,
+ "extracted_text": f"CSV File: {filename}\nFile Path: {csv_file_path}\nTotal Rows: {len(table_rows)}\n\nPreview:\n{csv_content[:500]}...",
+ "file_type": "CSV",
+ "date": datetime.now().strftime("%Y-%m-%d")
+ }
+
+ # Increment global PDF counter (using same counter for files)
+ increment_global_pdf_count()
+
+ logger.info(f"โ
Successfully extracted table data and saved as CSV")
+ return [document]
+
+ except Exception as e:
+ logger.error(f"โ Error extracting table as CSV: {str(e)}")
+ return []
+
+
+async def download_pdfs_direct(page, url: str, config: dict, source: str, start_date: str = None, end_date: str = None) -> List[dict]:
+ """
+ Approach 1: Direct PDF discovery on listing pages
+ """
+ try:
+ # Check if navigation is configured
+ navigation_selector = config.get("navigation_selector")
+ navigation_url_addition = config.get("navigation_url_addition")
+ start_page = config.get("start_page", 1)
+
+ all_pdfs = []
+ seen_pdf_urls = set() # Track unique PDF URLs to detect duplicates
+ current_page = start_page
+ consecutive_empty_pages = 0
+ max_consecutive_empty = 2 # Stop after 2 consecutive pages with no new content
+
+ # Navigate to the initial page
+ await page.goto(url, wait_until="domcontentloaded", timeout=30000)
+
+ # Handle pagination if configured
+ if navigation_selector and navigation_url_addition:
+ logger.info(f"๐งญ Navigation configured: selector={navigation_selector}, url_addition={navigation_url_addition}")
+ logger.info(f"๐ Starting from page: {start_page}")
+
+ while True:
+ logger.info(f"๐ Processing page {current_page}")
+
+ # Check MAX_PAGE_LIMIT if set
+ if MAX_PAGE_LIMIT is not None and current_page > MAX_PAGE_LIMIT:
+ logger.info(f"๐ Reached MAX_PAGE_LIMIT ({MAX_PAGE_LIMIT}), stopping pagination")
+ break
+
+ # Navigate to current page if not the first page
+ if current_page > start_page:
+ nav_url_addition = navigation_url_addition.replace("{page_no}", str(current_page))
+ nav_url = construct_navigation_url(url, nav_url_addition)
+ logger.info(f"๐งญ Navigating to: {nav_url}")
+ await page.goto(nav_url, wait_until="domcontentloaded", timeout=30000)
+ # Check for recaptcha and wait if present
+ captcha_result = await check_and_wait_for_recaptcha(page, config)
+ if captcha_result == "CAPTCHA_TIMEOUT":
+ logger.error("โ Captcha detected but not solved within timeout period")
+ return []
+
+ # Check if navigation element exists for next page
+ nav_element = await page.query_selector(navigation_selector)
+ if current_page == start_page and nav_element:
+ logger.info("โ
Navigation element found, more pages available")
+ elif current_page > start_page and not nav_element:
+ logger.info("๐ No more navigation elements found, stopping pagination")
+ break
+
+ # Check global PDF limit before processing page
+ if is_pdf_limit_reached():
+ logger.info(f"๐ Global PDF limit reached ({MAX_PDF_LIMIT}), stopping pagination")
+ break
+
+ # Extract PDFs from current page
+ page_pdfs = await extract_pdfs_from_current_page(page, config, source, start_date, end_date)
+
+ if page_pdfs:
+ # Check for new (non-duplicate) PDFs
+ new_pdfs = []
+ for pdf in page_pdfs:
+ pdf_url = pdf.get("url", "")
+ if pdf_url and pdf_url not in seen_pdf_urls:
+ seen_pdf_urls.add(pdf_url)
+ new_pdfs.append(pdf)
+
+ if new_pdfs:
+ all_pdfs.extend(new_pdfs)
+ consecutive_empty_pages = 0 # Reset counter
+ logger.info(f"๐ Found {len(new_pdfs)} new PDFs on page {current_page} (total: {len(page_pdfs)} PDFs on page)")
+ else:
+ consecutive_empty_pages += 1
+ logger.info(f"๐ No new PDFs found on page {current_page} (all {len(page_pdfs)} PDFs were duplicates)")
+
+ # Stop if we've had too many consecutive pages with no new content
+ if consecutive_empty_pages >= max_consecutive_empty:
+ logger.info(f"๐ Stopping pagination: {consecutive_empty_pages} consecutive pages with no new content")
+ break
+ else:
+ consecutive_empty_pages += 1
+ logger.info(f"๐ No PDFs found on page {current_page}")
+
+ # Stop if we've had too many consecutive pages with no content
+ if consecutive_empty_pages >= max_consecutive_empty:
+ logger.info(f"๐ Stopping pagination: {consecutive_empty_pages} consecutive pages with no content")
+ break
+
+ current_page += 1
+
+ else:
+ # No pagination configured, scrape single page only
+ logger.info("๐ No navigation configured - scraping single page only")
+ page_pdfs = await extract_pdfs_from_current_page(page, config, source, start_date, end_date)
+ all_pdfs.extend(page_pdfs)
+
+ logger.info(f"๐ Total unique PDFs found across all pages: {len(all_pdfs)}")
+ return all_pdfs
+
+ except Exception as e:
+ logger.error(f"โ Error in direct PDF discovery: {str(e)}")
+ return []
+
+
+async def download_pdfs_via_page_links(page, url: str, config: dict, source: str, start_date: str = None, end_date: str = None) -> List[dict]:
+ """
+ Approach 2: Page links first, then PDF discovery
+ 1. Go through pagination to collect all page links
+ 2. Visit each individual page link
+ 3. Find and download PDFs from each page
+ """
+ try:
+ logger.info("๐ Starting Approach 2: Page links first, then PDF discovery")
+
+ # Step 1: Collect all page links through pagination
+ logger.info("๐ Step 1: Collecting all page links through pagination")
+ all_page_links = await collect_all_page_links(page, url, config, source)
+
+ if not all_page_links:
+ logger.warning("โ ๏ธ No page links found")
+ return []
+
+ logger.info(f"๐ Collected {len(all_page_links)} page links")
+
+ # Step 2: Visit each page link and extract PDFs
+ logger.info("๐ Step 2: Visiting individual pages to find PDFs")
+ all_pdfs = []
+ seen_pdf_urls = set()
+
+ for i, page_url in enumerate(all_page_links, 1):
+ if scraping_cancelled():
+ logger.info("๐ Scraping cancelled, stopping PDF downloads")
+ break
+
+ # Check global PDF limit before processing page
+ if is_pdf_limit_reached():
+ logger.info(f"๐ Global PDF limit reached ({MAX_PDF_LIMIT}), stopping page processing")
+ break
+
+ logger.info(f"๐ Processing page {i}/{len(all_page_links)}: {page_url}")
+ logger.info(f"๐ Global PDF count: {get_global_pdf_count()}/{MAX_PDF_LIMIT}")
+
+ try:
+ # Navigate to the individual page
+ await page.goto(page_url, wait_until="domcontentloaded", timeout=30000)
+
+ # Check for recaptcha and wait if present
+ captcha_result = await check_and_wait_for_recaptcha(page, config)
+ if captcha_result == "CAPTCHA_TIMEOUT":
+ logger.error("โ Captcha detected but not solved within timeout period")
+ return [{
+ "title": "CAPTCHA_ERROR",
+ "content": "Captcha detected. Please try again later. The website requires captcha verification which could not be completed automatically.",
+ "date": datetime.now().strftime("%Y-%m-%d"),
+ "url": page_url
+ }]
+
+ # Extract title from this individual page using title selector (for Approach 2)
+ page_title = ""
+
+ # For MOPND, use the cached title from the listing page
+ if source == "mopnd":
+ # Try exact match first
+ if page_url in mopnd_article_titles:
+ page_title = mopnd_article_titles[page_url]
+ logger.info(f"๐ Using MOPND cached title from listing page: {page_title}")
+ else:
+ # Try to find a matching URL (handle query params, trailing slashes)
+ page_url_parsed = urlparse(page_url)
+ page_url_normalized = urlunparse((page_url_parsed.scheme, page_url_parsed.netloc, page_url_parsed.path, '', '', ''))
+
+ # Try normalized URL
+ matching_url = None
+ for cached_url in mopnd_article_titles.keys():
+ cached_parsed = urlparse(cached_url)
+ cached_normalized = urlunparse((cached_parsed.scheme, cached_parsed.netloc, cached_parsed.path, '', '', ''))
+ if cached_normalized == page_url_normalized:
+ matching_url = cached_url
+ break
+
+ if matching_url:
+ page_title = mopnd_article_titles[matching_url]
+ logger.info(f"๐ Using MOPND cached title (matched normalized URL): {page_title}")
+ else:
+ logger.warning(f"โ ๏ธ MOPND title not found in cache for URL: {page_url}")
+ logger.debug(f"๐ Available URLs in cache: {list(mopnd_article_titles.keys())[:3]}")
+ else:
+ # For other sites, extract title from individual page
+ title_selector = config.get("title")
+ if title_selector:
+ try:
+ title_element = await page.query_selector(title_selector)
+ if title_element:
+ page_title = await title_element.text_content()
+ if page_title:
+ page_title = page_title.strip()
+ logger.info(f"๐ Extracted title from page: {page_title}")
+ else:
+ logger.debug(f"โ ๏ธ Title element found but no text content")
+ else:
+ logger.debug(f"โ ๏ธ Title element not found with selector: {title_selector}")
+ except Exception as e:
+ logger.warning(f"โ ๏ธ Error extracting title from page: {str(e)}")
+
+ # Extract PDFs from this page, using page title for PDFs (Approach 2 behavior)
+ page_pdfs = await extract_pdfs_from_current_page(page, config, source, start_date, end_date, use_page_title_for_pdfs=True, page_title=page_title)
+
+ if page_pdfs:
+ # Check for new (non-duplicate) PDFs
+ new_pdfs = []
+ for pdf in page_pdfs:
+ pdf_url = pdf.get("url", "")
+ if pdf_url and pdf_url not in seen_pdf_urls:
+ seen_pdf_urls.add(pdf_url)
+ new_pdfs.append(pdf)
+
+ if new_pdfs:
+ all_pdfs.extend(new_pdfs)
+ logger.info(f"๐ Found {len(new_pdfs)} new PDFs on page {i} (total: {len(page_pdfs)} PDFs on page)")
+ else:
+ logger.info(f"๐ No new PDFs found on page {i} (all {len(page_pdfs)} PDFs were duplicates)")
+ else:
+ logger.info(f"๐ No PDFs found on page {i}")
+
+ except Exception as e:
+ logger.error(f"โ Error processing page {i} ({page_url}): {str(e)}")
+ continue
+
+ logger.info(f"๐ Total unique PDFs found across all pages: {len(all_pdfs)}")
+
+ # Debug: Log the structure of returned PDFs
+ if all_pdfs:
+ logger.info(f"๐ Sample PDF structure: {all_pdfs[0]}")
+ else:
+ logger.warning("โ ๏ธ No PDFs found - this might be the issue")
+
+ return all_pdfs
+
+ except Exception as e:
+ logger.error(f"โ Error in page-links-first approach: {str(e)}")
+ return []
+
+
+async def check_and_wait_for_recaptcha(page, config: dict) -> bool:
+ """
+ Check if recaptcha is present on the page and wait for user to solve it
+
+ Returns:
+ True if recaptcha was detected and handled, False otherwise
+ """
+ from scraper_common import set_captcha_status, clear_captcha_status
+
+ recaptcha_text = config.get("recaptcha_text")
+ if not recaptcha_text:
+ return False
+
+ try:
+ # Check if recaptcha text appears on the page
+ page_content = await page.content()
+ if recaptcha_text.lower() in page_content.lower():
+ logger.warning(f"๐ก๏ธ Recaptcha detected on page: {recaptcha_text}")
+ logger.info("โณ Waiting for user to solve recaptcha (max 60 seconds)...")
+ logger.info("๐ก Please solve the recaptcha in the browser window")
+
+ # Set captcha status for UI
+ set_captcha_status("๐ก๏ธ Captcha detected! Please complete the captcha challenge in the browser window. Waiting for you to solve it...")
+
+ # Wait for recaptcha to disappear (text should no longer be on page)
+ max_wait_time = 60 # seconds
+ wait_interval = 2 # check every 2 seconds
+ waited_time = 0
+
+ while waited_time < max_wait_time:
+ await asyncio.sleep(wait_interval)
+ waited_time += wait_interval
+
+ # Update status message with remaining time
+ remaining_time = max_wait_time - waited_time
+ set_captcha_status(f"๐ก๏ธ Captcha detected! Please complete the captcha challenge in the browser window. Time remaining: {remaining_time}s...")
+
+ # Check if recaptcha text is still present
+ current_content = await page.content()
+ if recaptcha_text.lower() not in current_content.lower():
+ logger.info("โ
Recaptcha appears to be solved, continuing...")
+ # Clear captcha status
+ clear_captcha_status()
+ # Wait a bit more for page to fully load after recaptcha
+ await asyncio.sleep(2)
+ return True
+
+ logger.debug(f"โณ Still waiting for recaptcha to be solved... ({waited_time}/{max_wait_time}s)")
+
+ logger.warning(f"โ ๏ธ Recaptcha wait timeout ({max_wait_time}s). Continuing anyway...")
+ # Clear captcha status
+ clear_captcha_status()
+ # Return a special value to indicate captcha timeout
+ return "CAPTCHA_TIMEOUT"
+ else:
+ # No captcha detected, clear any previous status
+ clear_captcha_status()
+
+ except Exception as e:
+ logger.warning(f"โ ๏ธ Error checking for recaptcha: {str(e)}")
+ clear_captcha_status()
+ return False
+
+ return False
+
+
+async def collect_all_page_links(page, url: str, config: dict, source: str) -> List[str]:
+ """
+ Collect all page links through pagination
+ """
+ try:
+ logger.info("๐ Starting page link collection through pagination")
+
+ # Check if navigation is configured
+ navigation_selector = config.get("navigation_selector")
+ navigation_url_addition = config.get("navigation_url_addition")
+ start_page = config.get("start_page", 1)
+ page_links_selector = config.get("page_links")
+
+ if not page_links_selector:
+ logger.error("โ No page_links selector configured")
+ return []
+
+ all_page_links = []
+ seen_page_urls = set() # Track unique page URLs to detect duplicates
+ current_page = start_page
+ consecutive_empty_pages = 0
+ max_consecutive_empty = 2 # Stop after 2 consecutive pages with no new content
+
+ # Navigate to the initial page
+ await page.goto(url, wait_until="domcontentloaded", timeout=30000)
+
+ # Check for recaptcha and wait if present
+ captcha_result = await check_and_wait_for_recaptcha(page, config)
+ if captcha_result == "CAPTCHA_TIMEOUT":
+ logger.error("โ Captcha detected but not solved within timeout period")
+ return [{
+ "title": "CAPTCHA_ERROR",
+ "content": "Captcha detected. Please try again later. The website requires captcha verification which could not be completed automatically.",
+ "date": datetime.now().strftime("%Y-%m-%d"),
+ "url": url
+ }]
+
+ # Handle pagination if configured
+ if navigation_selector and navigation_url_addition:
+ logger.info(f"๐งญ Navigation configured: selector={navigation_selector}, url_addition={navigation_url_addition}")
+ logger.info(f"๐ Starting from page: {start_page}")
+
+ while True:
+ logger.info(f"๐ Collecting page links from page {current_page}")
+
+ # Check MAX_PAGE_LIMIT if set
+ if MAX_PAGE_LIMIT is not None and current_page > MAX_PAGE_LIMIT:
+ logger.info(f"๐ Reached MAX_PAGE_LIMIT ({MAX_PAGE_LIMIT}), stopping pagination")
+ break
+
+ # Navigate to current page if not the first page
+ if current_page > start_page:
+ nav_url_addition = navigation_url_addition.replace("{page_no}", str(current_page))
+ nav_url = construct_navigation_url(url, nav_url_addition)
+ logger.info(f"๐งญ Navigating to: {nav_url}")
+ await page.goto(nav_url, wait_until="domcontentloaded", timeout=30000)
+ # Check for recaptcha and wait if present
+ captcha_result = await check_and_wait_for_recaptcha(page, config)
+ if captcha_result == "CAPTCHA_TIMEOUT":
+ logger.error("โ Captcha detected but not solved within timeout period")
+ return []
+
+ # Check if navigation element exists for next page
+ nav_element = await page.query_selector(navigation_selector)
+ if current_page == start_page and nav_element:
+ logger.info("โ
Navigation element found, more pages available")
+
+ elif current_page > start_page and not nav_element:
+ logger.info("๐ No more navigation elements found, stopping pagination")
+ break
+
+ # Extract page links from current page
+ # Use MOPND-specific function if this is MOPND
+ if source == "mopnd":
+ page_links = await extract_mopnd_page_links_with_dates(page, config)
+ else:
+ page_links = await extract_page_links_from_current_page(page, config)
+
+ if page_links:
+ # Check for new (non-duplicate) page links
+ new_page_links = []
+ for page_link in page_links:
+ if page_link and page_link not in seen_page_urls:
+ seen_page_urls.add(page_link)
+ new_page_links.append(page_link)
+
+ if new_page_links:
+ all_page_links.extend(new_page_links)
+ consecutive_empty_pages = 0 # Reset counter
+ logger.info(f"๐ Found {len(new_page_links)} new page links on page {current_page} (total: {len(page_links)} page links on page)")
+ else:
+ consecutive_empty_pages += 1
+ logger.info(f"๐ No new page links found on page {current_page} (all {len(page_links)} page links were duplicates)")
+
+ # Stop if we've had too many consecutive pages with no new content
+ if consecutive_empty_pages >= max_consecutive_empty:
+ logger.info(f"๐ Stopping pagination: {consecutive_empty_pages} consecutive pages with no new content")
+ break
+ else:
+ consecutive_empty_pages += 1
+ logger.info(f"๐ No page links found on page {current_page}")
+
+ # Stop if we've had too many consecutive pages with no content
+ if consecutive_empty_pages >= max_consecutive_empty:
+ logger.info(f"๐ Stopping pagination: {consecutive_empty_pages} consecutive pages with no content")
+ break
+
+ current_page += 1
+
+ else:
+ # No pagination configured, scrape single page only
+ logger.info("๐ No navigation configured - collecting page links from single page only")
+ # Use MOPND-specific function if this is MOPND
+ if source == "mopnd":
+ page_links = await extract_mopnd_page_links_with_dates(page, config)
+ else:
+ page_links = await extract_page_links_from_current_page(page, config)
+ all_page_links.extend(page_links)
+
+ logger.info(f"๐ Total unique page links collected: {len(all_page_links)}")
+ return all_page_links
+
+ except Exception as e:
+ logger.error(f"โ Error collecting page links: {str(e)}")
+ return []
+
+
+async def extract_page_links_from_current_page(page, config: dict) -> List[str]:
+ """
+ Extract page links from the current page
+ """
+ try:
+ # Get page links from the page
+ page_links = []
+ page_links_selector = config.get("page_links")
+
+ if isinstance(page_links_selector, list):
+ for selector in page_links_selector:
+ logger.info(f"๐ Looking for page links with selector: {selector}")
+ elements = await page.query_selector_all(selector)
+ logger.info(f"๐ฐ Found {len(elements)} elements with selector: {selector}")
+ for element in elements:
+ href = await element.get_attribute("href")
+ if href:
+ absolute_url = convert_to_absolute_url(href, page.url)
+ page_links.append(absolute_url)
+ else:
+ # If the element itself doesn't have href, look for a link within it or its parent
+ # First, try to find an tag within the element
+ link_element = await element.query_selector("a")
+ if link_element:
+ href = await link_element.get_attribute("href")
+ if href:
+ absolute_url = convert_to_absolute_url(href, page.url)
+ page_links.append(absolute_url)
+ continue
+
+ # If no link found within, try to find in parent element
+ try:
+ parent = await element.evaluate_handle("el => el.parentElement")
+ if parent:
+ parent_link = await parent.query_selector("a")
+ if parent_link:
+ href = await parent_link.get_attribute("href")
+ if href:
+ absolute_url = convert_to_absolute_url(href, page.url)
+ page_links.append(absolute_url)
+ except Exception as e:
+ logger.debug(f"โ ๏ธ Could not find link in parent: {str(e)}")
+ elif isinstance(page_links_selector, str):
+ logger.info(f"๐ Looking for page links with selector: {page_links_selector}")
+ elements = await page.query_selector_all(page_links_selector)
+ logger.info(f"๐ฐ Found {len(elements)} elements with selector: {page_links_selector}")
+ for element in elements:
+ href = await element.get_attribute("href")
+ if href:
+ absolute_url = convert_to_absolute_url(href, page.url)
+ page_links.append(absolute_url)
+ else:
+ # If the element itself doesn't have href, look for a link within it or its parent
+ # First, try to find an tag within the element
+ link_element = await element.query_selector("a")
+ if link_element:
+ href = await link_element.get_attribute("href")
+ if href:
+ absolute_url = convert_to_absolute_url(href, page.url)
+ page_links.append(absolute_url)
+ continue
+
+ # If no link found within, try to find in parent element
+ try:
+ parent = await element.evaluate_handle("el => el.parentElement")
+ if parent:
+ parent_link = await parent.query_selector("a")
+ if parent_link:
+ href = await parent_link.get_attribute("href")
+ if href:
+ absolute_url = convert_to_absolute_url(href, page.url)
+ page_links.append(absolute_url)
+ except Exception as e:
+ logger.debug(f"โ ๏ธ Could not find link in parent: {str(e)}")
+
+ logger.info(f"๐ Found {len(page_links)} page links on current page")
+ return page_links
+
+ except Exception as e:
+ logger.error(f"โ Error extracting page links from current page: {str(e)}")
+ return []
+
+
+async def extract_mopnd_page_links_with_dates(page, config: dict) -> List[str]:
+ """
+ Extract MOPND page links with dates and titles (special handling for MOPND)
+ """
+ try:
+ logger.info("๐ Extracting MOPND page links with dates and titles")
+
+ # Get page link selector
+ page_links_selector = config.get("page_links")
+ if not page_links_selector:
+ logger.warning("โ ๏ธ No page_links selector found in config")
+ return []
+
+ # Get date selector
+ date_selector = config.get("date")
+ if not date_selector:
+ logger.warning("โ ๏ธ No date selector found in config")
+ return []
+
+ # Get title selector
+ title_selector = config.get("title")
+ if not title_selector:
+ logger.warning("โ ๏ธ No title selector found in config")
+ return []
+
+ # Get all page link elements
+ logger.info(f"๐ Looking for page links with selector: {page_links_selector}")
+ link_elements = await page.query_selector_all(page_links_selector)
+ logger.info(f"๐ฐ Found {len(link_elements)} page link elements")
+
+ # Get all date elements
+ logger.info(f"๐ Looking for dates with selector: {date_selector}")
+ date_elements = await page.query_selector_all(date_selector)
+ logger.info(f"๐
Found {len(date_elements)} date elements")
+
+ # Note: For MOPND, title is extracted from link text itself since title selector is same as page_links
+
+ # Extract links, dates, and titles
+ page_links = []
+ for i, link_element in enumerate(link_elements):
+ try:
+ # Get the href attribute
+ href = await link_element.get_attribute("href")
+ if href:
+ # Convert to absolute URL
+ absolute_url = convert_to_absolute_url(href, page.url)
+ page_links.append(absolute_url)
+
+ # Extract title from the link text itself (since title selector is same as page_links)
+ try:
+ title_text = await link_element.text_content()
+ if title_text and title_text.strip():
+ # Store the title for this page URL
+ mopnd_article_titles[absolute_url] = title_text.strip()
+ logger.debug(f"โ
Stored title for {absolute_url}: {title_text.strip()}")
+ except Exception as e:
+ logger.debug(f"โ ๏ธ Could not extract title from link {i}: {str(e)}")
+
+ # Try to get corresponding date
+ # First try by index (assuming same order)
+ date_found = False
+ if i < len(date_elements):
+ try:
+ date_text = await date_elements[i].text_content()
+ if date_text and date_text.strip():
+ # Store the date for this page URL
+ mopnd_article_dates[absolute_url] = date_text.strip()
+ logger.debug(f"โ
Stored date for {absolute_url}: {date_text.strip()}")
+ date_found = True
+ except Exception as e:
+ logger.debug(f"โ ๏ธ Could not extract date for link {i}: {str(e)}")
+
+ # If date not found by index, try to find it in the same parent container
+ if not date_found:
+ try:
+ # Get the parent element of the link (look for common container classes)
+ parent = await link_element.evaluate_handle("el => el.closest('.post_info, .post, [class*=\"post\"], [class*=\"item\"], [class*=\"entry\"]')")
+ if parent:
+ # Try to find date element within the same parent
+ date_in_parent = await parent.query_selector(date_selector)
+ if date_in_parent:
+ date_text = await date_in_parent.text_content()
+ if date_text and date_text.strip():
+ mopnd_article_dates[absolute_url] = date_text.strip()
+ logger.debug(f"โ
Stored date from parent container for {absolute_url}: {date_text.strip()}")
+ date_found = True
+ except Exception as e:
+ logger.debug(f"โ ๏ธ Could not find date in parent container: {str(e)}")
+
+ if not date_found:
+ logger.warning(f"โ ๏ธ Could not extract date for link {i} ({absolute_url})")
+
+ except Exception as e:
+ logger.warning(f"โ Error extracting link {i}: {str(e)}")
+ continue
+
+ logger.info(f"๐ Found {len(page_links)} MOPND page links with dates and titles")
+ logger.info(f"๐ Stored {len(mopnd_article_titles)} titles and {len(mopnd_article_dates)} dates")
+
+ # Debug: Show first few stored titles and dates
+ if mopnd_article_titles:
+ sample_titles = list(mopnd_article_titles.items())[:3]
+ logger.debug(f"๐ Sample titles: {sample_titles}")
+ if mopnd_article_dates:
+ sample_dates = list(mopnd_article_dates.items())[:3]
+ logger.debug(f"๐ Sample dates: {sample_dates}")
+
+ return page_links
+
+ except Exception as e:
+ logger.error(f"โ Error extracting MOPND page links: {str(e)}")
+ return []
+
+
+async def _extract_nbs_pdfs_grouped_by_title(page, config: dict, source: str, start_date: str = None, end_date: str = None) -> List[dict]:
+ """
+ Special NBS handler: Multiple titles on one page, each title can have multiple PDFs
+ Approach 1: Extract all titles and PDFs, then group PDFs sequentially by title
+ """
+ try:
+ logger.info(f"๐ท NBS special handling (Approach 1): Processing multiple titles with grouped PDFs")
+
+ # Extract all titles from the page in order
+ title_selector = config.get("title")
+ titles = []
+ if title_selector:
+ try:
+ title_elements = await page.query_selector_all(title_selector)
+ for element in title_elements:
+ try:
+ title_text = await element.text_content()
+ if title_text:
+ title_text = title_text.strip()
+ titles.append(title_text)
+ logger.debug(f"๐ Found title: {title_text}")
+ except Exception as e:
+ logger.debug(f"โ ๏ธ Could not extract title text: {str(e)}")
+ except Exception as e:
+ logger.warning(f"โ ๏ธ Error extracting titles: {str(e)}")
+
+ if not titles:
+ logger.warning("โ ๏ธ No titles found on NBS page, falling back to standard processing")
+ return []
+
+ logger.info(f"๐ Found {len(titles)} titles on page")
+
+ # Extract all PDF links in order
+ pdf_selector = config.get("pdf_links")
+ all_pdf_links = []
+ if isinstance(pdf_selector, list):
+ for selector in pdf_selector:
+ try:
+ elements = await page.query_selector_all(selector)
+ for element in elements:
+ href = await element.get_attribute("href")
+ if href:
+ absolute_url = convert_to_absolute_url(href, page.url)
+ try:
+ link_text = await element.text_content()
+ pdf_name = link_text.strip() if link_text else ""
+ except:
+ pdf_name = ""
+
+ if not pdf_name:
+ url_path = urlparse(absolute_url).path
+ if url_path:
+ pdf_name = unquote(os.path.basename(url_path))
+ if pdf_name.lower().endswith('.pdf'):
+ pdf_name = pdf_name[:-4]
+
+ # Skip PDFs with "Read More" as the name (not actual PDF names)
+ if pdf_name and pdf_name.strip().lower() == "read more":
+ logger.debug(f"โญ๏ธ Skipping PDF with 'Read More' name: {absolute_url}")
+ continue
+
+ all_pdf_links.append({
+ "url": absolute_url,
+ "name": pdf_name
+ })
+ except Exception as e:
+ logger.debug(f"โ ๏ธ Error with PDF selector '{selector}': {str(e)}")
+ elif isinstance(pdf_selector, str):
+ try:
+ elements = await page.query_selector_all(pdf_selector)
+ for element in elements:
+ href = await element.get_attribute("href")
+ if href:
+ absolute_url = convert_to_absolute_url(href, page.url)
+ try:
+ link_text = await element.text_content()
+ pdf_name = link_text.strip() if link_text else ""
+ except:
+ pdf_name = ""
+
+ if not pdf_name:
+ url_path = urlparse(absolute_url).path
+ if url_path:
+ pdf_name = unquote(os.path.basename(url_path))
+ if pdf_name.lower().endswith('.pdf'):
+ pdf_name = pdf_name[:-4]
+
+ # Skip PDFs with "Read More" as the name (not actual PDF names)
+ if pdf_name and pdf_name.strip().lower() == "read more":
+ logger.debug(f"โญ๏ธ Skipping PDF with 'Read More' name: {absolute_url}")
+ continue
+
+ all_pdf_links.append({
+ "url": absolute_url,
+ "name": pdf_name
+ })
+ except Exception as e:
+ logger.warning(f"โ ๏ธ Error extracting PDF elements: {str(e)}")
+
+ logger.info(f"๐ Found {len(all_pdf_links)} PDF links on page")
+
+ if not all_pdf_links:
+ logger.warning("โ ๏ธ No PDF links found on NBS page")
+ return []
+
+ # Group PDFs by title: Divide PDFs evenly among titles, or use sequential matching
+ # Simple approach: Divide PDFs evenly among titles
+ pdfs_per_title = len(all_pdf_links) // len(titles) if len(titles) > 0 else 0
+ remainder = len(all_pdf_links) % len(titles)
+
+ title_pdf_groups = []
+ pdf_index = 0
+
+ for i, title in enumerate(titles):
+ # Calculate how many PDFs this title gets
+ num_pdfs = pdfs_per_title + (1 if i < remainder else 0)
+
+ # Get PDFs for this title
+ title_pdfs = all_pdf_links[pdf_index:pdf_index + num_pdfs]
+ pdf_index += num_pdfs
+
+ if title_pdfs:
+ title_pdf_groups.append({
+ "title": title,
+ "pdfs": title_pdfs
+ })
+ logger.info(f"๐ Title '{title}': {len(title_pdfs)} associated PDFs")
+
+ if not title_pdf_groups:
+ logger.warning("โ ๏ธ No title-PDF groups created")
+ return []
+
+ # Extract dates from page
+ date_selector = config.get("date")
+ date_elements = []
+ if date_selector:
+ try:
+ date_elements = await page.query_selector_all(date_selector)
+ except Exception as e:
+ logger.debug(f"โ ๏ธ Could not extract date elements: {str(e)}")
+
+ # Process each title group: Try all PDFs, if some work, create document
+ all_documents = []
+
+ for group_idx, group in enumerate(title_pdf_groups):
+ if scraping_cancelled():
+ logger.info("๐ Scraping cancelled, stopping NBS processing")
+ break
+
+ if is_pdf_limit_reached():
+ logger.info(f"๐ Global PDF limit reached ({MAX_PDF_LIMIT}), stopping NBS processing")
+ break
+
+ title = group["title"]
+ pdf_list = group["pdfs"]
+
+ logger.info(f"๐ท Processing title {group_idx+1}/{len(title_pdf_groups)}: '{title}' ({len(pdf_list)} PDFs)")
+
+ # Try all PDFs for this title
+ successful_pdfs = []
+ combined_text_parts = []
+ all_pdf_paths = []
+ total_size = 0
+
+ for pdf_idx, pdf_info in enumerate(pdf_list):
+ if scraping_cancelled():
+ break
+
+ if is_pdf_limit_reached():
+ break
+
+ pdf_url = pdf_info["url"]
+ pdf_link_name = pdf_info.get("name", "") or f"PDF {pdf_idx+1}"
+
+ # Skip PDFs with "Read More" as the name (not actual PDF names)
+ if pdf_link_name and pdf_link_name.strip().lower() == "read more":
+ logger.info(f" โญ๏ธ Skipping PDF with 'Read More' name: {pdf_url}")
+ continue
+
+ logger.info(f" โฌ๏ธ Trying PDF {pdf_idx+1}/{len(pdf_list)}: {pdf_link_name}")
+
+ try:
+ download_result = download_and_save_pdf(pdf_url, source)
+ if download_result["success"]:
+ local_pdf_path = download_result["path"]
+ extracted_text = extract_text_from_pdf_file(local_pdf_path)
+
+ if extracted_text and len(extracted_text.strip()) > 10:
+ current_count = increment_global_pdf_count()
+
+ successful_pdfs.append({
+ "url": pdf_url,
+ "path": local_pdf_path,
+ "name": pdf_link_name,
+ "size": download_result["size"],
+ "text": extracted_text
+ })
+
+ combined_text_parts.append(f"=== {pdf_link_name} ===\n{extracted_text}")
+ all_pdf_paths.append(local_pdf_path)
+ total_size += download_result["size"]
+
+ logger.info(f" โ
Successfully processed PDF '{pdf_link_name}' (Global: {current_count}/{MAX_PDF_LIMIT})")
+ else:
+ logger.warning(f" โ ๏ธ PDF downloaded but no text extracted: {pdf_link_name}")
+ else:
+ logger.warning(f" โ Failed to download PDF: {download_result.get('message', 'Unknown error')}")
+ except Exception as e:
+ logger.error(f" โ Error processing PDF: {str(e)}")
+ continue
+
+ # Create document if at least one PDF succeeded (Approach 1: if some work, get PDF)
+ if successful_pdfs:
+ # Extract date (use first date element or group index if multiple dates)
+ pdf_date_raw = ""
+ if date_elements:
+ date_idx = min(group_idx, len(date_elements) - 1)
+ try:
+ date_text = await date_elements[date_idx].text_content()
+ if date_text:
+ pdf_date_raw = date_text.strip()
+ except:
+ pass
+
+ # Standardize the date to YYYY-MM-DD format
+ pdf_date = standardize_date(pdf_date_raw, default_to_current=True)
+ if not pdf_date:
+ pdf_date = datetime.now().strftime("%Y-%m-%d")
+
+ # Check date range filtering
+ if start_date or end_date:
+ start_dt = parse_date_input(start_date) if start_date else None
+ end_dt = parse_date_input(end_date) if end_date else None
+ if not is_date_in_range(pdf_date, start_dt, end_dt, include_missing=False):
+ logger.info(f"๐
Title date {pdf_date} is outside date range - skipping")
+ continue
+
+ # Combine all PDF texts
+ combined_text = "\n\n".join(combined_text_parts)
+ primary_path = all_pdf_paths[0] if all_pdf_paths else ""
+
+ all_documents.append({
+ "url": successful_pdfs[0]["url"],
+ "local_path": primary_path,
+ "size": total_size,
+ "title": title,
+ "source": source,
+ "extracted_text": combined_text,
+ "file_type": "PDF",
+ "date": pdf_date,
+ "nbs_pdf_count": len(successful_pdfs),
+ "nbs_all_paths": all_pdf_paths
+ })
+
+ logger.info(f"โ
Created document for title '{title}' with {len(successful_pdfs)}/{len(pdf_list)} successful PDFs")
+ else:
+ logger.warning(f"โ ๏ธ No PDFs successfully processed for title: '{title}' - moving forward")
+
+ logger.info(f"๐ NBS Processing Summary: {len(all_documents)} documents created from {len(title_pdf_groups)} titles")
+ return all_documents
+
+ except Exception as e:
+ logger.error(f"โ Error in NBS PDF extraction: {str(e)}")
+ return []
+
+
+async def extract_pdfs_from_current_page(page, config: dict, source: str, start_date: str = None, end_date: str = None, use_page_title_for_pdfs: bool = False, page_title: str = None) -> List[dict]:
+ """
+ Extract PDFs from the current page
+ Special handling for NBS: Multiple titles on one page, each title can have multiple PDFs
+
+ Args:
+ page: Playwright page object
+ config: Website configuration dict
+ source: Source name
+ start_date: Optional start date for filtering
+ end_date: Optional end date for filtering
+ use_page_title_for_pdfs: If True, use page title for PDFs (Approach 2 behavior)
+ page_title: Pre-extracted page title (optional, will extract if not provided and use_page_title_for_pdfs is True)
+ """
+ try:
+ # Special handling for NBS: Group PDFs by title
+ is_nbs = source.lower() in ["nbs", "nbs somalia"]
+ if is_nbs:
+ return await _extract_nbs_pdfs_grouped_by_title(page, config, source, start_date, end_date)
+
+ # Standard handling for other sources: Each PDF/file gets its own document
+ # Get PDF links from the page (with link text for name extraction)
+ pdf_links = []
+ pdf_selector = config.get("pdf_links")
+
+ if isinstance(pdf_selector, list):
+ for selector in pdf_selector:
+ elements = await page.query_selector_all(selector)
+ for element in elements:
+ # Try href first, then button-url (for FEWS custom elements)
+ href = await element.get_attribute("href")
+ if not href:
+ href = await element.get_attribute("button-url")
+ if href:
+ absolute_url = convert_to_absolute_url(href, page.url)
+ # Extract link text for PDF name
+ try:
+ link_text = await element.text_content()
+ pdf_name = link_text.strip() if link_text else ""
+ except Exception as e:
+ logger.debug(f"โ ๏ธ Could not extract link text: {str(e)}")
+ pdf_name = ""
+
+ # If no link text, try to extract filename from URL
+ if not pdf_name:
+ url_path = urlparse(absolute_url).path
+ if url_path:
+ pdf_name = unquote(os.path.basename(url_path))
+ # Remove .pdf extension if present (we'll add it back if needed)
+ if pdf_name.lower().endswith('.pdf'):
+ pdf_name = pdf_name[:-4]
+
+ pdf_links.append({
+ "url": absolute_url,
+ "name": pdf_name,
+ "file_type": "PDF"
+ })
+ elif isinstance(pdf_selector, str):
+ elements = await page.query_selector_all(pdf_selector)
+ for element in elements:
+ # Try href first, then button-url (for FEWS custom elements)
+ href = await element.get_attribute("href")
+ if not href:
+ href = await element.get_attribute("button-url")
+ if href:
+ absolute_url = convert_to_absolute_url(href, page.url)
+ # Extract link text for PDF name
+ try:
+ link_text = await element.text_content()
+ pdf_name = link_text.strip() if link_text else ""
+ except Exception as e:
+ logger.debug(f"โ ๏ธ Could not extract link text: {str(e)}")
+ pdf_name = ""
+
+ # If no link text, try to extract filename from URL
+ if not pdf_name:
+ from urllib.parse import unquote
+ url_path = urlparse(absolute_url).path
+ if url_path:
+ pdf_name = unquote(os.path.basename(url_path))
+ # Remove .pdf extension if present (we'll add it back if needed)
+ if pdf_name.lower().endswith('.pdf'):
+ pdf_name = pdf_name[:-4]
+
+ pdf_links.append({
+ "url": absolute_url,
+ "name": pdf_name,
+ "file_type": "PDF"
+ })
+
+ # Get file links (CSV, etc.) from the page if configured
+ file_links = []
+ file_selector = config.get("file_links")
+
+ if file_selector:
+ # Determine file type from URL or config
+ file_type = "CSV" # Default to CSV
+
+ if isinstance(file_selector, list):
+ for selector in file_selector:
+ elements = await page.query_selector_all(selector)
+ for element in elements:
+ href = await element.get_attribute("href")
+ if href:
+ absolute_url = convert_to_absolute_url(href, page.url)
+ # Determine file type from URL
+ if absolute_url.lower().endswith('.csv'):
+ file_type = "CSV"
+ elif absolute_url.lower().endswith(('.xlsx', '.xls')):
+ file_type = "XLSX"
+ elif absolute_url.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp')):
+ file_type = "PNG" # Image files
+ else:
+ file_type = "CSV" # Default
+
+ # Extract link text for file name
+ try:
+ link_text = await element.text_content()
+ file_name = link_text.strip() if link_text else ""
+ except Exception as e:
+ logger.debug(f"โ ๏ธ Could not extract link text: {str(e)}")
+ file_name = ""
+
+ # If no link text, try to extract filename from URL
+ if not file_name:
+ url_path = urlparse(absolute_url).path
+ if url_path:
+ file_name = unquote(os.path.basename(url_path))
+ # Remove file extension if present
+ for ext in ['.csv', '.xlsx', '.xls', '.png', '.jpg', '.jpeg', '.gif', '.webp']:
+ if file_name.lower().endswith(ext):
+ file_name = file_name[:-len(ext)]
+ break
+
+ file_links.append({
+ "url": absolute_url,
+ "name": file_name,
+ "file_type": file_type
+ })
+ elif isinstance(file_selector, str):
+ elements = await page.query_selector_all(file_selector)
+ for element in elements:
+ href = await element.get_attribute("href")
+ if href:
+ absolute_url = convert_to_absolute_url(href, page.url)
+ # Determine file type from URL
+ if absolute_url.lower().endswith('.csv'):
+ file_type = "CSV"
+ elif absolute_url.lower().endswith(('.xlsx', '.xls')):
+ file_type = "XLSX"
+ elif absolute_url.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp')):
+ file_type = "PNG" # Image files
+ else:
+ file_type = "CSV" # Default
+
+ # Extract link text for file name
+ try:
+ link_text = await element.text_content()
+ file_name = link_text.strip() if link_text else ""
+ except Exception as e:
+ logger.debug(f"โ ๏ธ Could not extract link text: {str(e)}")
+ file_name = ""
+
+ # If no link text, try to extract filename from URL
+ if not file_name:
+ url_path = urlparse(absolute_url).path
+ if url_path:
+ file_name = unquote(os.path.basename(url_path))
+ # Remove file extension if present
+ for ext in ['.csv', '.xlsx', '.xls', '.png', '.jpg', '.jpeg', '.gif', '.webp']:
+ if file_name.lower().endswith(ext):
+ file_name = file_name[:-len(ext)]
+ break
+
+ file_links.append({
+ "url": absolute_url,
+ "name": file_name,
+ "file_type": file_type
+ })
+
+ # Combine PDF and file links
+ all_links = pdf_links + file_links
+
+ logger.info(f"๐ Found {len(pdf_links)} PDF links and {len(file_links)} file links on current page (total: {len(all_links)})")
+
+ # Log CSV files specifically for debugging
+ csv_files = [link for link in file_links if link.get("file_type") == "CSV"]
+ if csv_files:
+ logger.info(f"๐ Found {len(csv_files)} CSV file(s) to process:")
+ for csv_file in csv_files:
+ logger.info(f" - CSV: {csv_file.get('name', 'Unknown')} at {csv_file.get('url', 'Unknown URL')}")
+
+ # Extract page title using the title selector from config (if not already provided)
+ if page_title is None:
+ page_title = ""
+ title_selector = config.get("title")
+ if title_selector:
+ try:
+ title_element = await page.query_selector(title_selector)
+ if title_element:
+ page_title = await title_element.text_content()
+ if page_title:
+ page_title = page_title.strip()
+ logger.info(f"๐ Extracted page title: {page_title}")
+ else:
+ logger.debug(f"โ ๏ธ Title element found but no text content")
+ else:
+ logger.debug(f"โ ๏ธ Title element not found with selector: {title_selector}")
+ except Exception as e:
+ logger.warning(f"โ ๏ธ Error extracting page title: {str(e)}")
+ elif page_title:
+ logger.info(f"๐ Using provided page title: {page_title}")
+
+ # Try to extract dates from the page for date filtering
+ date_selector = config.get("date")
+ date_elements = []
+ if date_selector:
+ try:
+ date_elements = await page.query_selector_all(date_selector)
+ logger.debug(f"๐
Found {len(date_elements)} date elements on current page")
+ except Exception as e:
+ logger.debug(f"โ ๏ธ Could not extract date elements: {str(e)}")
+
+ # Download each PDF/file
+ downloaded_pdfs = []
+ for i, file_info in enumerate(all_links):
+ if scraping_cancelled():
+ logger.info("๐ Scraping cancelled, stopping file downloads")
+ break
+
+ # Check global PDF limit before processing
+ if is_pdf_limit_reached():
+ logger.info(f"๐ Global PDF limit reached ({MAX_PDF_LIMIT}), stopping file processing")
+ break
+
+ file_url = file_info["url"]
+ file_name = file_info.get("name", "") # Individual file name from link text
+ file_type = file_info.get("file_type", "PDF")
+
+ # Determine title priority based on context
+ if use_page_title_for_pdfs and page_title:
+ # Approach 2: Use page title for files (when navigating to individual pages)
+ file_name = page_title
+ logger.info(f"๐ Using page title for {file_type} (Approach 2): {file_name}")
+ elif file_name and file_name != "":
+ # Approach 1: Priority to individual file link text
+ # Clean up the file name (remove extra whitespace, newlines, etc.)
+ file_name = " ".join(file_name.split())
+ logger.info(f"๐ Using {file_type} link text as name: {file_name}")
+ elif page_title:
+ # Fallback: Use page title if individual file name is missing
+ file_name = page_title
+ logger.info(f"๐ Using page title as fallback for {file_type}: {file_name}")
+ else:
+ # Last resort fallback
+ current_count = get_global_pdf_count() + 1
+ file_name = f"{file_type} {current_count}"
+ logger.info(f"๐ Using fallback name: {file_name}")
+
+ logger.info(f"โฌ๏ธ Downloading {file_type} {i+1}/{len(all_links)}: {file_url}")
+ logger.info(f"๐ {file_type} name: {file_name}")
+ logger.info(f"๐ Global PDF count: {get_global_pdf_count()}/{MAX_PDF_LIMIT}")
+
+ try:
+ # Download based on file type
+ if file_type == "PDF":
+ download_result = download_and_save_pdf(file_url, source)
+ else:
+ # For CSV and other files
+ download_result = download_and_save_file(file_url, source, file_type.lower())
+
+ if download_result["success"]:
+ local_file_path = download_result["path"]
+ extracted_text = ""
+
+ # Extract text only for PDFs
+ if file_type == "PDF":
+ logger.info(f"๐ Extracting text from local file: {local_file_path}")
+ extracted_text = extract_text_from_pdf_file(local_file_path)
+ logger.info(f"๐ Extracted text length: {len(extracted_text)} characters")
+ if not extracted_text:
+ logger.warning("โ ๏ธ No text extracted from PDF")
+ elif file_type == "CSV":
+ # Special handling for CSV files: read a preview of the content
+ try:
+ import csv
+ logger.info(f"๐ Reading CSV file preview: {local_file_path}")
+ with open(local_file_path, 'r', encoding='utf-8', errors='ignore') as csv_file:
+ csv_reader = csv.reader(csv_file)
+ # Read first 10 rows as preview
+ preview_rows = []
+ for idx, row in enumerate(csv_reader):
+ if idx >= 10:
+ break
+ preview_rows.append(row)
+
+ # Convert to text preview
+ if preview_rows:
+ # Get headers if available
+ headers = preview_rows[0] if len(preview_rows) > 0 else []
+ data_rows = preview_rows[1:] if len(preview_rows) > 1 else []
+
+ # Extract location from title for icpac_seasonal_forecast
+ location_info = ""
+ if source == "icpac_seasonal_forecast" and file_name:
+ location_info = f"Location: {file_name}\n"
+
+ # Create a readable preview
+ preview_text = f"CSV File: {file_name}\n"
+ if location_info:
+ preview_text += location_info
+ preview_text += f"File Path: {local_file_path}\n"
+ preview_text += f"Total Rows Previewed: {len(preview_rows)}\n\n"
+
+ if headers:
+ preview_text += "Headers: " + ", ".join(str(h) for h in headers) + "\n\n"
+
+ if data_rows:
+ preview_text += "Sample Data (first few rows):\n"
+ for row in data_rows[:5]: # Show first 5 data rows
+ preview_text += ", ".join(str(cell) for cell in row) + "\n"
+
+ extracted_text = preview_text
+ logger.info(f"๐ CSV preview extracted: {len(extracted_text)} characters")
+ else:
+ location_info = ""
+ if source == "icpac_seasonal_forecast" and file_name:
+ location_info = f"Location: {file_name}\n"
+ extracted_text = f"CSV File: {file_name}\n"
+ if location_info:
+ extracted_text += location_info
+ extracted_text += f"File Path: {local_file_path}\n(File is empty or could not be read)"
+ logger.warning("โ ๏ธ CSV file appears to be empty")
+ except Exception as e:
+ logger.warning(f"โ ๏ธ Could not read CSV preview: {str(e)}")
+ location_info = ""
+ if source == "icpac_seasonal_forecast" and file_name:
+ location_info = f"Location: {file_name}\n"
+ extracted_text = f"CSV File: {file_name}\n"
+ if location_info:
+ extracted_text += location_info
+ extracted_text += f"File Path: {local_file_path}\n(Preview could not be generated: {str(e)})"
+ elif file_type == "PNG":
+ # Special handling for PNG files (images) - mention location from title
+ location_info = ""
+ if source == "icpac_seasonal_forecast" and file_name:
+ location_info = f"Location: {file_name}\n"
+
+ extracted_text = f"PNG File: {file_name}\n"
+ if location_info:
+ extracted_text += location_info
+ extracted_text += f"File Path: {local_file_path}\n"
+ extracted_text += "(PNG image file downloaded successfully)"
+ logger.info(f"๐ PNG file info extracted: {file_name}")
+ else:
+ # For other file types (XLSX, etc.)
+ logger.info(f"๐ {file_type} file downloaded (no text extraction needed)")
+ extracted_text = f"{file_type} File: {file_name}\nFile Path: {local_file_path}"
+
+ # Extract date if available from listing page
+ file_date_raw = ""
+ if source == "mopnd":
+ # For MOPND, use the current page URL (not the PDF URL) to look up the date
+ current_page_url = page.url
+ # Try exact match first
+ if current_page_url in mopnd_article_dates:
+ file_date_raw = mopnd_article_dates[current_page_url]
+ logger.debug(f"โ
Using MOPND date from cache (page URL: {current_page_url}): {file_date_raw}")
+ else:
+ # Try to find a matching URL (handle query params, trailing slashes)
+ page_url_parsed = urlparse(current_page_url)
+ page_url_normalized = urlunparse((page_url_parsed.scheme, page_url_parsed.netloc, page_url_parsed.path, '', '', ''))
+
+ # Try normalized URL
+ matching_url = None
+ for cached_url in mopnd_article_dates.keys():
+ cached_parsed = urlparse(cached_url)
+ cached_normalized = urlunparse((cached_parsed.scheme, cached_parsed.netloc, cached_parsed.path, '', '', ''))
+ if cached_normalized == page_url_normalized:
+ matching_url = cached_url
+ break
+
+ if matching_url:
+ file_date_raw = mopnd_article_dates[matching_url]
+ logger.debug(f"โ
Using MOPND date from cache (matched normalized URL): {file_date_raw}")
+ else:
+ logger.warning(f"โ ๏ธ MOPND date not found in cache for page URL: {current_page_url}")
+ logger.debug(f"๐ Available page URLs in cache: {list(mopnd_article_dates.keys())[:3]}")
+ elif i < len(date_elements):
+ try:
+ date_text = await date_elements[i].text_content()
+ if date_text:
+ file_date_raw = date_text.strip()
+ logger.debug(f"โ
Extracted raw date from listing page: {file_date_raw}")
+ except Exception as e:
+ logger.debug(f"โ ๏ธ Could not extract date for {file_type} {i+1}: {str(e)}")
+
+ # Standardize the date to YYYY-MM-DD format
+ file_date = standardize_date(file_date_raw, default_to_current=True)
+ if not file_date:
+ file_date = datetime.now().strftime("%Y-%m-%d")
+
+ # Check date range filtering
+ if start_date or end_date:
+ start_dt = parse_date_input(start_date) if start_date else None
+ end_dt = parse_date_input(end_date) if end_date else None
+ if not is_date_in_range(file_date, start_dt, end_dt, include_missing=False):
+ logger.info(f"๐
{file_type} date {file_date} is outside date range [{start_date}, {end_date}] - filtering out")
+ continue
+
+ # Increment global PDF counter
+ current_count = increment_global_pdf_count()
+
+ downloaded_pdfs.append({
+ "url": file_url,
+ "local_path": local_file_path,
+ "size": download_result["size"],
+ "title": file_name, # Use extracted name from link text
+ "source": source,
+ "extracted_text": extracted_text,
+ "file_type": file_type,
+ "date": file_date
+ })
+ logger.info(f"โ
Successfully downloaded and processed {file_type} '{file_name}' (Global: {current_count}/{MAX_PDF_LIMIT})")
+ else:
+ logger.warning(f"โ Failed to download {file_type} {i+1}: {download_result['message']}")
+ except Exception as e:
+ logger.error(f"โ Error downloading {file_type} {i+1}: {str(e)}")
+ continue
+
+ return downloaded_pdfs
+
+ except Exception as e:
+ logger.error(f"โ Error extracting PDFs from current page: {str(e)}")
+ return []
+
+async def extract_document_content_unified(page, document_url: str, config: dict, website_type: str = None, pdf_count: int = 0, start_date: str = None, end_date: str = None) -> dict:
+ """
+ Unified function to extract content from a single document (PDF-focused)
+ With 5 retry attempts for loading documents
+ """
+ try:
+ # Navigate to document with retry logic (5 attempts)
+ max_retries = 5
+ retry_count = 0
+ page_loaded = False
+
+ while retry_count < max_retries and not page_loaded:
+ try:
+ retry_count += 1
+ logger.info(f"๐ Loading document (attempt {retry_count}/{max_retries}): {document_url}")
+
+ # Navigate with different strategies based on attempt
+ if retry_count == 1:
+ # First attempt: Use domcontentloaded for faster loading
+ await page.goto(document_url, wait_until="domcontentloaded", timeout=30000)
+ elif retry_count == 2:
+ # Second attempt: Use basic loading
+ await page.goto(document_url, timeout=20000)
+ elif retry_count == 3:
+ # Third attempt: Use networkidle
+ await page.goto(document_url, wait_until="networkidle", timeout=15000)
+ else:
+ # Fourth and fifth attempts: Try with shorter timeouts
+ await page.goto(document_url, timeout=10000)
+
+ logger.info(f"โ
Successfully loaded document on attempt {retry_count}")
+ page_loaded = True
+
+ except Exception as e:
+ logger.warning(f"โ ๏ธ Attempt {retry_count} failed for {document_url}: {str(e)}")
+
+ if retry_count >= max_retries:
+ logger.error(f"โ Failed to load document after {max_retries} attempts: {document_url}")
+ return {
+ "title": "Network Error",
+ "content": f"Failed to access document after {max_retries} attempts: {str(e)}",
+ "date": datetime.now().strftime("%Y-%m-%d"),
+ "url": document_url
+ }
+
+ # Wait before retry
+ await asyncio.sleep(2)
+
+ if not page_loaded:
+ return {
+ "title": "Network Error",
+ "content": f"Failed to access document after {max_retries} attempts",
+ "date": datetime.now().strftime("%Y-%m-%d"),
+ "url": document_url
+ }
+
+ # Extract title from page using title selector (priority source)
+ title = ""
+ title_extracted_from_page = False
+
+ # For MOPND, use the title extracted from the main page
+ if website_type == "mopnd" and document_url in mopnd_article_titles:
+ title = mopnd_article_titles[document_url]
+ title_extracted_from_page = True
+ logger.debug(f"โ
Using MOPND title from main page: {title}")
+ elif website_type == "mopnd":
+ logger.warning(f"โ ๏ธ MOPND title not found in cache for URL: {document_url}")
+ logger.debug(f"๐ Available titles: {list(mopnd_article_titles.keys())[:3]}")
+ else:
+ # Regular title extraction for other websites using title selector from config
+ title_selector = config.get("title")
+ if title_selector:
+ try:
+ title_element = await page.query_selector(title_selector)
+ if title_element:
+ title = await title_element.text_content()
+ if title:
+ title = title.strip()
+ title_extracted_from_page = True
+ logger.info(f"โ
Extracted title from page using selector '{title_selector}': {title}")
+ else:
+ logger.debug(f"โ ๏ธ Title element found but no text content with selector: {title_selector}")
+ else:
+ logger.debug(f"โ ๏ธ Title element not found with selector: {title_selector}")
+ except Exception as e:
+ logger.warning(f"Error extracting title with selector '{title_selector}': {str(e)}")
+ else:
+ logger.warning("โ ๏ธ No title selector found in config")
+
+ # Use the passed website_type or try to determine it from config
+ if website_type is None:
+ for site_type, site_config in WEBSITE_CONFIG.items():
+ if site_config == config:
+ website_type = site_type
+ break
+ if website_type is None:
+ website_type = "unknown"
+
+ content = ""
+ pdf_path = ""
+
+ # For document-focused sites, check for PDF links
+ # Dynamically determine if this is a PDF website
+ pdf_websites = get_pdf_websites()
+ if website_type in pdf_websites:
+ pdf_links = []
+ try:
+ # Get PDF selectors from config
+ pdf_links_selector = config.get("pdf_links")
+
+ # Initialize elements list
+ pdf_elements = []
+
+ # Handle different formats in config
+ if isinstance(pdf_links_selector, list):
+ # Process each selector in the array
+ logger.info(f"๐ Processing array of {len(pdf_links_selector)} PDF selectors")
+ for selector in pdf_links_selector:
+ try:
+ elements = await page.query_selector_all(selector)
+ logger.info(f"๐ Found {len(elements)} elements with selector {selector}")
+ pdf_elements.extend(elements)
+ except Exception as e:
+ logger.warning(f"โ Error with selector '{selector}': {str(e)}")
+ elif isinstance(pdf_links_selector, str):
+ # Old format with single string selector
+ logger.info(f"๐ Using string selector: {pdf_links_selector}")
+ pdf_elements = await page.query_selector_all(pdf_links_selector)
+ else:
+ logger.warning("โ ๏ธ No pdf_links selector in config, skipping PDF extraction")
+
+ # Extract PDF URLs and names from elements
+ logger.debug(f"๐ Processing {len(pdf_elements)} PDF elements for {website_type}")
+ for i, element in enumerate(pdf_elements):
+ try:
+ logger.debug(f"๐ Extracting PDF URL from element {i+1}/{len(pdf_elements)}")
+
+ # Get the href attribute, or button-url for FEWS custom elements
+ href = await element.get_attribute("href")
+ if not href:
+ href = await element.get_attribute("button-url")
+ if href:
+ # Convert relative URLs to absolute URLs
+ absolute_url = convert_to_absolute_url(href, page.url)
+
+ # Extract link text for PDF name
+ try:
+ link_text = await element.text_content()
+ pdf_name = link_text.strip() if link_text else ""
+ except Exception as e:
+ logger.debug(f"โ ๏ธ Could not extract link text: {str(e)}")
+ pdf_name = ""
+
+ # If no link text, try to extract filename from URL
+ if not pdf_name:
+ from urllib.parse import unquote
+ url_path = urlparse(absolute_url).path
+ if url_path:
+ pdf_name = unquote(os.path.basename(url_path))
+ # Remove .pdf extension if present
+ if pdf_name.lower().endswith('.pdf'):
+ pdf_name = pdf_name[:-4]
+
+ pdf_links.append({
+ "url": absolute_url,
+ "name": pdf_name
+ })
+ logger.info(f"๐ Found PDF URL: {absolute_url}")
+ if pdf_name:
+ logger.info(f"๐ PDF name: {pdf_name}")
+ else:
+ logger.debug(f"โ ๏ธ No href or button-url attribute found on element {i+1}")
+
+ except Exception as e:
+ logger.warning(f"โ Error extracting PDF URL from element {i+1}: {str(e)}")
+ continue
+ except Exception as e:
+ logger.warning(f"Error extracting PDF links: {str(e)}")
+ pdf_links = []
+
+ if pdf_links:
+ logger.info(f"๐ Found {len(pdf_links)} PDF links, processing...")
+ # Process all PDF links (up to limit)
+ pdf_content_parts = []
+ for i, pdf_info in enumerate(pdf_links):
+ if pdf_count >= MAX_PDF_LIMIT and MAX_PDF_LIMIT is not None:
+ logger.info(f"๐ Reached PDF limit ({MAX_PDF_LIMIT}), stopping PDF processing")
+ break
+
+ # Handle both old format (string) and new format (dict)
+ if isinstance(pdf_info, dict):
+ pdf_url = pdf_info["url"]
+ pdf_name = pdf_info.get("name", "")
+ else:
+ # Backward compatibility: if it's still a string
+ pdf_url = pdf_info
+ pdf_name = ""
+
+ try:
+ logger.info(f"๐ Processing PDF {i+1}/{len(pdf_links)}: {pdf_url}")
+ if pdf_name:
+ logger.info(f"๐ PDF name: {pdf_name}")
+
+ # First try to download the PDF to get the local path
+ download_result = download_and_save_pdf(pdf_url, website_type)
+ if download_result["success"]:
+ # Set the PDF path to the local downloaded file
+ pdf_path = download_result["path"]
+ logger.info(f"๐ PDF downloaded to: {pdf_path}")
+
+ # Now extract text from the downloaded PDF
+ pdf_content = extract_text_from_pdf_file(pdf_path)
+
+ if pdf_content and len(pdf_content.strip()) > 10:
+ # Use extracted PDF name if available, otherwise use generic label
+ pdf_label = pdf_name if pdf_name else f"PDF {i+1}"
+ pdf_content_parts.append(f"{pdf_label} Content:\n{pdf_content}")
+ logger.info(f"โ
Extracted {len(pdf_content)} characters from {pdf_label}")
+
+ # Only use PDF name as title if page title extraction completely failed
+ # Priority: page title selector > PDF name > PDF content
+ if pdf_name and not title_extracted_from_page and not title:
+ title = pdf_name
+ logger.info(f"๐ Using PDF name as title (page title extraction failed): {title}")
+ else:
+ logger.warning(f"โ ๏ธ No content extracted from PDF {i+1}")
+ else:
+ logger.warning(f"โ Failed to download PDF {i+1}: {download_result['message']}")
+
+ pdf_count += 1
+ logger.info(f"๐ PDF {pdf_count}/{MAX_PDF_LIMIT} processed")
+
+ except Exception as e:
+ logger.warning(f"โ Error processing PDF {i+1}: {str(e)}")
+ continue
+
+ # Combine all PDF content
+ if pdf_content_parts:
+ content = "\n\n".join(pdf_content_parts)
+ logger.info(f"๐ Combined PDF content: {len(content)} characters total")
+
+ # Only extract title from PDF content as absolute last resort
+ # Priority: page title selector > PDF name > PDF content
+ if not title_extracted_from_page and not title and content and len(content) > 50:
+ lines = content.split('\n')[:5]
+ for line in lines:
+ if line.strip() and len(line.strip()) > 10 and len(line.strip()) < 100:
+ title = line.strip()
+ logger.info(f"๐ Using title extracted from PDF content (page title extraction failed): {title}")
+ break
+ else:
+ logger.warning("โ ๏ธ No PDF content extracted, skipping document")
+ content = ""
+ else:
+ # No PDF links found, skip document
+ logger.info("๐ No PDF links found, skipping document")
+ content = ""
+
+ # Extract date using configuration selector
+ date_raw = ""
+
+ # For MOPND, use the date extracted from the main page
+ if website_type == "mopnd" and document_url in mopnd_article_dates:
+ date_raw = mopnd_article_dates[document_url]
+ logger.debug(f"โ
Using MOPND date from main page: {date_raw}")
+ elif website_type == "mopnd":
+ logger.warning(f"โ ๏ธ MOPND date not found in cache for URL: {document_url}")
+ logger.debug(f"๐ Available dates: {list(mopnd_article_dates.keys())[:3]}")
+ else:
+ # Regular date extraction for other websites
+ date_selector = config.get("date")
+
+ if date_selector:
+ try:
+ date_element = await page.query_selector(date_selector)
+ if date_element:
+ date_raw = await date_element.text_content()
+ if date_raw:
+ date_raw = date_raw.strip()
+ logger.debug(f"โ
Extracted raw date: {date_raw}")
+ except Exception as e:
+ logger.warning(f"Error extracting date with selector {date_selector}: {str(e)}")
+
+ # Standardize the date to YYYY-MM-DD format
+ date = standardize_date(date_raw, default_to_current=True)
+ if not date:
+ date = datetime.now().strftime("%Y-%m-%d")
+ logger.info(f"No date found with config selector, using current date: {date}")
+
+ # Check date range filtering
+ if start_date or end_date:
+ start_dt = parse_date_input(start_date) if start_date else None
+ end_dt = parse_date_input(end_date) if end_date else None
+ if not is_date_in_range(date, start_dt, end_dt, include_missing=False):
+ logger.info(f"๐
Document date {date} is outside date range [{start_date}, {end_date}] - filtering out")
+ return None
+
+ # Skip documents with no content (for PDF-based sites)
+ # Dynamically determine if this is a PDF website
+ pdf_websites = get_pdf_websites()
+ if website_type in pdf_websites:
+ if not content or len(content.strip()) < 10:
+ logger.info(f"๐ Skipping document with no PDF content: {document_url}")
+ return None
+
+ result = {
+ "title": title or "No title found",
+ "content": content or "No content found",
+ "date": date,
+ "url": document_url
+ }
+
+ # Add PDF path for PDF-based sites
+ # Dynamically determine if this is a PDF website
+ pdf_websites = get_pdf_websites()
+ if website_type in pdf_websites:
+ if pdf_path:
+ result["pdf_path"] = pdf_path
+ logger.info(f"๐ Added PDF path to result: {pdf_path}")
+ else:
+ logger.warning("โ ๏ธ No PDF path available for PDF-based site")
+
+ return result
+
+ except Exception as e:
+ logger.error(f"Error extracting content from {document_url}: {str(e)}")
+ return {
+ "title": "Error",
+ "content": f"Error extracting content: {str(e)}",
+ "date": datetime.now().strftime("%Y-%m-%d"),
+ "url": document_url
+ }