"""
Document Scraper - Handles PDF and document processing
"""
import asyncio
import json
import logging
import os
import hashlib
import tempfile
import requests
import urllib3
from datetime import datetime
from typing import List, Dict, Any
from urllib.parse import urlparse, urlunparse, unquote
# Import common functions from scraper_common
from scraper_common import (
WEBSITE_CONFIG, MAX_PDF_LIMIT, MAX_ARTICLE_LIMIT, MAX_PAGE_LIMIT,
ensure_archive_directory, convert_to_absolute_url,
set_scraping_cancelled, scraping_cancelled, force_close_browser,
reset_global_pdf_count, increment_global_pdf_count, get_global_pdf_count, is_pdf_limit_reached,
get_pdf_websites
)
# Import date filtering utilities
from date_filter import is_date_in_range, parse_date_input, standardize_date
# Suppress SSL warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s'
)
logger = logging.getLogger(__name__)
def construct_navigation_url(base_url: str, nav_addition: str) -> str:
"""
Construct navigation URL by properly handling trailing slashes and query parameters
"""
# Remove trailing slash from base URL if it exists
if base_url.endswith('/'):
base_url = base_url.rstrip('/')
# Check if nav_addition starts with / or ?
if nav_addition.startswith('/'):
# Direct path addition
return base_url + nav_addition
elif nav_addition.startswith('?'):
# Query parameter addition
return base_url + nav_addition
else:
# Default: add as path
return base_url + '/' + nav_addition
# Global variables for document processing
mopnd_article_dates = {}
mopnd_article_titles = {}
def clear_mopnd_cache():
"""Clear MOPND article cache when starting a new scraping session"""
global mopnd_article_dates, mopnd_article_titles
mopnd_article_dates.clear()
mopnd_article_titles.clear()
logger.info("๐งน Cleared MOPND article cache")
def get_pdf_hash(pdf_url: str) -> str:
"""Generate a hash for the PDF URL to use as cache key"""
return hashlib.md5(pdf_url.encode()).hexdigest()
def is_pdf_archived(pdf_url: str, source: str) -> bool:
"""Check if PDF is already archived"""
ensure_archive_directory()
hash_key = get_pdf_hash(pdf_url)
archive_dir = f"archive/{source}"
date_folder = datetime.now().strftime("%Y-%m-%d")
archive_path = f"{archive_dir}/{date_folder}"
if os.path.exists(archive_path):
for file in os.listdir(archive_path):
if file.startswith(hash_key):
return True
return False
def get_archived_pdf_path(pdf_url: str, source: str) -> str:
"""Get the archived PDF file path"""
ensure_archive_directory()
hash_key = get_pdf_hash(pdf_url)
archive_dir = f"archive/{source}"
date_folder = datetime.now().strftime("%Y-%m-%d")
archive_path = f"{archive_dir}/{date_folder}"
if os.path.exists(archive_path):
for file in os.listdir(archive_path):
if file.startswith(hash_key):
return os.path.join(archive_path, file)
return None
def archive_pdf(pdf_url: str, content: bytes, source: str) -> str:
"""Archive PDF content and return the local file path"""
logger.info(f"๐พ Starting PDF archiving process...")
ensure_archive_directory()
# Create source-specific archive directory
archive_dir = f"archive/{source}"
date_folder = datetime.now().strftime("%Y-%m-%d")
archive_path = f"{archive_dir}/{date_folder}"
# Create directory if it doesn't exist
os.makedirs(archive_path, exist_ok=True)
# Generate unique filename using hash
hash_key = get_pdf_hash(pdf_url)
filename = f"{hash_key}.pdf"
file_path = os.path.join(archive_path, filename)
# Save PDF content
with open(file_path, 'wb') as f:
f.write(content)
logger.info(f"๐ PDF archived to: {file_path}")
# Update archive index
update_archive_index(pdf_url, file_path, source)
return file_path
def archive_file(file_url: str, content: bytes, source: str, file_extension: str = "csv") -> str:
"""Archive file content (CSV, etc.) and return the local file path"""
logger.info(f"๐พ Starting file archiving process for {file_extension.upper()}...")
ensure_archive_directory()
# Create source-specific archive directory
archive_dir = f"archive/{source}"
date_folder = datetime.now().strftime("%Y-%m-%d")
archive_path = f"{archive_dir}/{date_folder}"
# Create directory if it doesn't exist
os.makedirs(archive_path, exist_ok=True)
# Generate unique filename using hash
hash_key = get_pdf_hash(file_url)
filename = f"{hash_key}.{file_extension}"
file_path = os.path.join(archive_path, filename)
# Save file content
with open(file_path, 'wb') as f:
f.write(content)
logger.info(f"๐ File archived to: {file_path}")
# Update archive index
update_archive_index(file_url, file_path, source)
return file_path
def update_archive_index(pdf_url: str, local_path: str, source: str):
"""Update the archive index with PDF information"""
ensure_archive_directory()
index_file = f"archive/{source}/index.json"
# Load existing index or create new one
if os.path.exists(index_file):
try:
with open(index_file, 'r') as f:
index = json.load(f)
except:
index = {}
else:
index = {}
# Add new entry
hash_key = get_pdf_hash(pdf_url)
index[hash_key] = {
"url": pdf_url,
"local_path": local_path,
"source": source,
"archived_date": datetime.now().isoformat()
}
# Save updated index
with open(index_file, 'w') as f:
json.dump(index, f, indent=2)
def download_and_save_pdf(pdf_url: str, source: str = "unknown") -> dict:
"""
Download PDF and save it to archive, return metadata
"""
try:
logger.info(f"โฌ๏ธ Downloading PDF: {pdf_url}")
logger.info(f"๐ Source: {source}")
# Check if PDF is already archived
if is_pdf_archived(pdf_url, source):
logger.info(f"โ
PDF already archived: {pdf_url}")
cached_path = get_archived_pdf_path(pdf_url, source)
return {
"success": True,
"path": cached_path,
"size": os.path.getsize(cached_path),
"message": "PDF already archived"
}
# Create headers to mimic a browser request
parsed_url = urlparse(pdf_url)
base_domain = f"{parsed_url.scheme}://{parsed_url.netloc}"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
"Connection": "keep-alive",
"Referer": base_domain
}
logger.info(f"๐ Using base domain as referer: {base_domain}")
# Try direct download with headers first
try:
session = requests.Session()
# Disable SSL verification for problematic certificates
session.verify = False
# First, visit the domain homepage to get cookies
session.get(base_domain, headers=headers, timeout=30, verify=False)
logger.info(f"๐ช Visited domain homepage to gather cookies")
# Then try to download the PDF with proper headers
response = session.get(pdf_url, headers=headers, timeout=30, verify=False)
response.raise_for_status()
logger.info(f"โ
PDF downloaded successfully. Size: {len(response.content)} bytes")
except Exception as e:
logger.error(f"โ Error downloading PDF: {str(e)}")
raise
# Archive the PDF
archived_path = archive_pdf(pdf_url, response.content, source)
logger.info(f"๐ PDF archived to: {archived_path}")
return {
"success": True,
"path": archived_path,
"size": len(response.content),
"message": "PDF downloaded and archived successfully"
}
except Exception as e:
# Direct download failed, return error without fallback
logger.error(f"โ PDF download failed for {pdf_url}: {str(e)}")
return {
"success": False,
"path": None,
"size": 0,
"message": f"Error downloading PDF: {str(e)}"
}
def download_and_save_file(file_url: str, source: str = "unknown", file_type: str = "csv") -> dict:
"""
Download file (CSV, etc.) and save it to archive, return metadata
"""
try:
logger.info(f"โฌ๏ธ Downloading {file_type.upper()}: {file_url}")
logger.info(f"๐ Source: {source}")
# Determine file extension
file_extension = file_type.lower()
if file_extension not in ["csv", "xlsx", "xls", "png", "jpg", "jpeg", "gif", "webp"]:
# Try to determine from URL if not in known types
if file_url.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp')):
file_extension = file_url.lower().split('.')[-1]
else:
file_extension = "csv" # Default to CSV
# Check if file is already archived (using same hash mechanism as PDFs)
if is_pdf_archived(file_url, source):
logger.info(f"โ
File already archived: {file_url}")
cached_path = get_archived_pdf_path(file_url, source)
# Check if the cached file has the right extension
if cached_path and os.path.exists(cached_path):
return {
"success": True,
"path": cached_path,
"size": os.path.getsize(cached_path),
"file_type": file_type,
"message": "File already archived"
}
# Create headers to mimic a browser request
parsed_url = urlparse(file_url)
base_domain = f"{parsed_url.scheme}://{parsed_url.netloc}"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
"Connection": "keep-alive",
"Referer": base_domain
}
logger.info(f"๐ Using base domain as referer: {base_domain}")
# Try direct download with headers first
try:
session = requests.Session()
# Disable SSL verification for problematic certificates
session.verify = False
# First, visit the domain homepage to get cookies
session.get(base_domain, headers=headers, timeout=30, verify=False)
logger.info(f"๐ช Visited domain homepage to gather cookies")
# Then try to download the file with proper headers
response = session.get(file_url, headers=headers, timeout=30, verify=False)
response.raise_for_status()
logger.info(f"โ
{file_type.upper()} downloaded successfully. Size: {len(response.content)} bytes")
except Exception as e:
logger.error(f"โ Error downloading {file_type.upper()}: {str(e)}")
raise
# Archive the file
archived_path = archive_file(file_url, response.content, source, file_extension)
logger.info(f"๐ {file_type.upper()} archived to: {archived_path}")
return {
"success": True,
"path": archived_path,
"size": len(response.content),
"file_type": file_type,
"message": f"{file_type.upper()} downloaded and archived successfully"
}
except Exception as e:
# Direct download failed, return error without fallback
logger.error(f"โ {file_type.upper()} download failed for {file_url}: {str(e)}")
return {
"success": False,
"path": None,
"size": 0,
"file_type": file_type,
"message": f"Error downloading {file_type.upper()}: {str(e)}"
}
def get_website_type_from_source(source: str) -> str:
"""
Map source name to website type for config lookup
"""
source_to_type = {
"FS Cluster": "fscluster",
"ReliefWeb": "reliefweb",
"NBS Somalia": "nbs",
"HDX": "hdx",
"HDX Humanitarian Data Exchange": "hdx",
"LogCluster": "logcluster",
"FSNau": "fsnau",
"FSNau - Food Security and Nutrition Analysis Unit": "fsnau",
"FSNau Publications": "fsnau_publications",
"FEWS NET": "fews",
"FEWS NET - Famine Early Warning Systems Network": "fews",
"ICPAC": "icpac",
"ICPAC - IGAD Climate Prediction and Applications Centre": "icpac",
"ICPAC - IGAD Climate Prediction and Applications Centre - Seasonal Forecast": "icpac_seasonal_forecast",
"FAO SWALIM": "faoswalim",
"FAO SWALIM Publications": "faoswalim_publications",
"FAO SWALIM Journals": "faoswalim_journals",
"FAO SWALIM Events": "faoswalim_events",
"FAO SWALIM Articles": "faoswalim_articles",
"FAO SWALIM Flood Watch": "faoswalim_flood_watch",
"FAO SWALIM Water Publications": "faoswalim_water_publications",
"MOPND Somaliland": "mopnd",
"Copernicus Drought Observatory": "copernicus_drought",
"fscluster": "fscluster",
"reliefweb": "reliefweb",
"NBS": "nbs",
"HDX": "hdx",
"LogCluster": "logcluster",
"FSNau": "fsnau",
"FSNau Publications": "fsnau_publications",
"FEWS NET": "fews",
"ICPAC": "icpac",
"FAO SWALIM": "faoswalim"
}
return source_to_type.get(source, "fscluster") # Default fallback
def extract_pdf_text(pdf_url: str, source: str = "unknown") -> str:
"""
Extract text content from archived PDF using multiple methods
"""
try:
logger.info(f"๐ Starting PDF text extraction for URL: {pdf_url}")
logger.info(f"๐ Source: {source}")
# Check if URL is relative and convert to absolute URL
parsed_url = urlparse(pdf_url)
# If the URL is relative (no scheme/netloc), we need to construct complete URL
if not parsed_url.scheme and pdf_url.startswith('/'):
# Get website type from source and lookup base_url from config
website_type = get_website_type_from_source(source)
config = WEBSITE_CONFIG.get(website_type, {})
base_url = config.get('base_url', 'https://fscluster.org') # Default fallback
logger.info(f"๐ Using base_url from config for {website_type}: {base_url}")
# Construct complete URL
complete_url = f"{base_url}{pdf_url}"
logger.info(f"๐ Converted relative URL {pdf_url} to absolute URL: {complete_url}")
pdf_url = complete_url
# Get archived PDF path
if is_pdf_archived(pdf_url, source):
cached_path = get_archived_pdf_path(pdf_url, source)
logger.info(f"๐ Using archived PDF: {cached_path}")
result = extract_text_from_pdf_file(cached_path)
logger.info(f"๐ Extracted text length: {len(result)} characters")
if not result.strip():
logger.warning("โ ๏ธ No text extracted from PDF - might be image-based or corrupted")
else:
logger.info(f"โ
Successfully extracted text from PDF")
return result
else:
# Try to download the PDF first if not in archive
logger.info(f"โ PDF not found in archive: {pdf_url}")
logger.info(f"โฌ๏ธ Attempting to download PDF now...")
# Attempt the download
download_result = download_and_save_pdf(pdf_url, source)
if download_result["success"]:
logger.info(f"โ
Successfully downloaded PDF: {download_result['path']}")
# Now extract text from the newly downloaded PDF
result = extract_text_from_pdf_file(download_result["path"])
return result
else:
logger.error(f"โ Failed to download PDF: {download_result['message']}")
# Special failure message for fscluster
if source.lower() == "fscluster" and "403" in download_result["message"]:
return f"PDF download blocked by fscluster.org (403 Forbidden). Try visiting the document page first in your browser before scraping, or use authenticated session cookies: {pdf_url}"
else:
return f"PDF not found in archive and download failed: {pdf_url}"
except Exception as e:
logger.error(f"โ Error extracting PDF text from {pdf_url}: {str(e)}")
return f"Error extracting PDF: {str(e)}"
def extract_text_from_pdf_file(pdf_file_or_path):
"""
Extract text from PDF using multiple methods for better compatibility
"""
text_content = ""
try:
logger.info(f"๐ Starting PDF text extraction...")
# Method 1: Try pypdf first (most reliable for text-based PDFs)
try:
logger.info(f"๐ Trying pypdf extraction...")
import pypdf
if isinstance(pdf_file_or_path, str):
# File path
logger.info(f"๐ Reading from file path: {pdf_file_or_path}")
with open(pdf_file_or_path, 'rb') as file:
pdf_reader = pypdf.PdfReader(file)
logger.info(f"๐ PDF has {len(pdf_reader.pages)} pages")
for i, page in enumerate(pdf_reader.pages):
page_text = page.extract_text()
if page_text:
text_content += page_text + "\n"
else:
# BytesIO objects
logger.info(f"๐ Reading from BytesIO object")
pdf_reader = pypdf.PdfReader(pdf_file_or_path)
logger.info(f"๐ PDF has {len(pdf_reader.pages)} pages")
for i, page in enumerate(pdf_reader.pages):
page_text = page.extract_text()
if page_text:
text_content += page_text + "\n"
if text_content.strip():
logger.info(f"โ
Successfully extracted text using pypdf: {len(text_content)} characters")
return text_content.strip()
else:
logger.warning("โ ๏ธ pypdf extracted no text")
except Exception as e:
logger.warning(f"โ ๏ธ pypdf extraction failed: {str(e)}")
# Method 2: Try pdfplumber (better for complex layouts)
try:
logger.info(f"๐ Trying pdfplumber extraction...")
import pdfplumber
if isinstance(pdf_file_or_path, str):
with pdfplumber.open(pdf_file_or_path) as pdf:
logger.info(f"๐ PDF has {len(pdf.pages)} pages")
for i, page in enumerate(pdf.pages):
page_text = page.extract_text()
if page_text:
text_content += page_text + "\n"
else:
# For BytesIO objects, we need to save to temp file first
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as temp_file:
temp_file.write(pdf_file_or_path.getvalue())
temp_file.flush()
with pdfplumber.open(temp_file.name) as pdf:
logger.info(f"๐ PDF has {len(pdf.pages)} pages")
for i, page in enumerate(pdf.pages):
page_text = page.extract_text()
if page_text:
text_content += page_text + "\n"
# Clean up temp file
os.unlink(temp_file.name)
logger.info(f"๐๏ธ Temp file cleaned up")
if text_content.strip():
logger.info(f"โ
Successfully extracted text using pdfplumber: {len(text_content)} characters")
return text_content.strip()
else:
logger.warning("โ ๏ธ pdfplumber extracted no text")
except ImportError:
logger.warning("โ ๏ธ pdfplumber not available")
except Exception as e:
logger.warning(f"โ ๏ธ pdfplumber extraction failed: {str(e)}")
# Method 3: Try PyMuPDF (fitz) for better text extraction
try:
logger.info(f"๐ Trying PyMuPDF extraction...")
import fitz # PyMuPDF
if isinstance(pdf_file_or_path, str):
doc = fitz.open(pdf_file_or_path)
else:
doc = fitz.open(stream=pdf_file_or_path.getvalue(), filetype="pdf")
logger.info(f"๐ PDF has {doc.page_count} pages")
for page_num in range(doc.page_count):
page = doc.load_page(page_num)
page_text = page.get_text()
if page_text:
text_content += page_text + "\n"
doc.close()
if text_content.strip():
logger.info(f"โ
Successfully extracted text using PyMuPDF: {len(text_content)} characters")
return text_content.strip()
else:
logger.warning("โ ๏ธ PyMuPDF extracted no text")
except ImportError:
logger.warning("โ ๏ธ PyMuPDF not available")
except Exception as e:
logger.warning(f"โ ๏ธ PyMuPDF extraction failed: {str(e)}")
# Try one more advanced method for text-within-images using OCR
# This is especially helpful for LogCluster PDFs which often have text embedded in images
if not text_content.strip() or len(text_content.strip()) < 500: # If no text or very little text extracted
try:
logger.info(f"๐ Trying OCR extraction as last resort...")
import pytesseract
from PIL import Image
from pdf2image import convert_from_path, convert_from_bytes
if isinstance(pdf_file_or_path, str):
# Convert PDF to images
images = convert_from_path(pdf_file_or_path, dpi=300)
else:
# For BytesIO objects
images = convert_from_bytes(pdf_file_or_path.getvalue(), dpi=300)
logger.info(f"๐ผ๏ธ Converted PDF to {len(images)} images for OCR")
for i, image in enumerate(images):
# Extract text using OCR
page_text = pytesseract.image_to_string(image, lang='eng')
if page_text.strip():
text_content += f"Page {i+1} (OCR):\n{page_text}\n"
logger.info(f"๐ OCR extracted {len(page_text)} characters from page {i+1}")
if text_content.strip():
logger.info(f"โ
Successfully extracted text using OCR: {len(text_content)} characters")
return text_content.strip()
else:
logger.warning("โ ๏ธ OCR extracted no text")
except ImportError:
logger.warning("โ ๏ธ OCR libraries not available (pytesseract, pdf2image)")
except Exception as e:
logger.warning(f"โ OCR extraction failed: {str(e)}")
# If we got some text content from earlier methods, return it even if it's partial
if text_content.strip():
logger.info(f"โ ๏ธ Returning partial text extraction ({len(text_content.strip())} characters)")
return text_content.strip()
# If all methods fail, return a message
logger.warning("โ All PDF extraction methods failed")
return "PDF text extraction failed - document may be image-based or corrupted"
except Exception as e:
logger.error(f"โ Error in PDF text extraction: {str(e)}")
return f"PDF text extraction failed: {str(e)}"
async def download_all_pdfs_from_page(page, url: str, config: dict, source: str, start_date: str = None, end_date: str = None) -> List[dict]:
"""
Download all PDFs from multiple pages with pagination support
Supports both approaches:
1. Direct PDF discovery (pdf_links only)
2. Page links first, then PDF discovery (page_links + pdf_links)
"""
try:
logger.info(f"๐ Starting PDF download from page: {url}")
logger.info(f"๐ Source: {source}")
# Clear MOPND cache if this is a MOPND scraping session
if source == "mopnd":
clear_mopnd_cache()
# Reset global PDF counter at the start of processing
reset_global_pdf_count()
logger.info(f"๐ Reset global PDF counter. Limit: {MAX_PDF_LIMIT}")
# Check for special table extraction mode
extract_table_as_csv = config.get("extract_table_as_csv", False)
if extract_table_as_csv:
logger.info("๐ Using table extraction mode: Extract table data and convert to CSV")
return await extract_table_as_csv_file(page, url, config, source, start_date, end_date)
# Determine which approach to use
page_links_selector = config.get("page_links")
pdf_links_selector = config.get("pdf_links")
file_links_selector = config.get("file_links")
# Debug logging
logger.debug(f"๐ Config check for source '{source}': page_links={page_links_selector}, pdf_links={pdf_links_selector}, file_links={file_links_selector}")
# If page_links is configured and not null/empty, use Approach 2
# This allows us to navigate to individual pages and extract PDFs from each
if page_links_selector and pdf_links_selector:
# Approach 2: Page links first, then PDF discovery
logger.info("๐ Using Approach 2: Page links first, then PDF discovery")
return await download_pdfs_via_page_links(page, url, config, source, start_date, end_date)
elif page_links_selector and file_links_selector:
# Approach 2: Page links first, then file discovery
logger.info("๐ Using Approach 2: Page links first, then file discovery")
return await download_pdfs_via_page_links(page, url, config, source, start_date, end_date)
elif pdf_links_selector or file_links_selector:
# Approach 1: Direct PDF/file discovery
logger.info("๐ Using Approach 1: Direct PDF/file discovery")
return await download_pdfs_direct(page, url, config, source, start_date, end_date)
else:
logger.error("โ No pdf_links, file_links, or page_links configured")
return []
except Exception as e:
logger.error(f"โ Error downloading PDFs from pages: {str(e)}")
return []
async def extract_table_as_csv_file(page, url: str, config: dict, source: str, start_date: str = None, end_date: str = None) -> List[dict]:
"""
Special function to extract table data and convert to CSV
"""
try:
logger.info(f"๐ Starting table extraction from page: {url}")
logger.info(f"๐ Source: {source}")
# Navigate to the page
await page.goto(url, wait_until="domcontentloaded", timeout=30000)
# Get content selector (should be "td, th" for table cells)
content_selector = config.get("content")
if not content_selector:
logger.error("โ No content selector configured for table extraction")
return []
logger.info(f"๐ Extracting table data using selector: {content_selector}")
# Extract all table cells (td and th)
cell_elements = await page.query_selector_all(content_selector)
logger.info(f"๐ Found {len(cell_elements)} table cells")
if not cell_elements:
logger.warning("โ ๏ธ No table cells found")
return []
# Extract text from all cells
cells_data = []
for element in cell_elements:
try:
cell_text = await element.text_content()
if cell_text:
cells_data.append(cell_text.strip())
else:
cells_data.append("")
except Exception as e:
logger.debug(f"โ ๏ธ Error extracting cell text: {str(e)}")
cells_data.append("")
# Try to find the table structure to organize data into rows
# First, try to find all table rows
table_rows = []
try:
# Try to find table rows
row_elements = await page.query_selector_all("tr")
if row_elements:
logger.info(f"๐ Found {len(row_elements)} table rows")
for row_element in row_elements:
row_cells = await row_element.query_selector_all("td, th")
row_data = []
for cell in row_cells:
try:
cell_text = await cell.text_content()
row_data.append(cell_text.strip() if cell_text else "")
except:
row_data.append("")
if row_data: # Only add non-empty rows
table_rows.append(row_data)
except Exception as e:
logger.warning(f"โ ๏ธ Could not extract table rows: {str(e)}")
# Fallback: organize cells into rows based on a reasonable assumption
# If we can't find rows, we'll create a single row with all cells
if cells_data:
table_rows = [cells_data]
if not table_rows:
logger.warning("โ ๏ธ No table rows extracted")
return []
# Convert to CSV format
import csv
import io
csv_buffer = io.StringIO()
csv_writer = csv.writer(csv_buffer)
# Write all rows to CSV
for row in table_rows:
csv_writer.writerow(row)
csv_content = csv_buffer.getvalue()
csv_buffer.close()
logger.info(f"๐ Generated CSV content: {len(csv_content)} characters, {len(table_rows)} rows")
# Generate filename
from datetime import datetime
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"river_levels_{timestamp}.csv"
# Save CSV file to archive
csv_bytes = csv_content.encode('utf-8')
csv_file_path = archive_file(url, csv_bytes, source, "csv")
logger.info(f"๐ CSV file saved to: {csv_file_path}")
# Create document entry
document = {
"url": url,
"local_path": csv_file_path,
"size": len(csv_bytes),
"title": f"River Levels Data - {datetime.now().strftime('%Y-%m-%d')}",
"source": source,
"extracted_text": f"CSV File: {filename}\nFile Path: {csv_file_path}\nTotal Rows: {len(table_rows)}\n\nPreview:\n{csv_content[:500]}...",
"file_type": "CSV",
"date": datetime.now().strftime("%Y-%m-%d")
}
# Increment global PDF counter (using same counter for files)
increment_global_pdf_count()
logger.info(f"โ
Successfully extracted table data and saved as CSV")
return [document]
except Exception as e:
logger.error(f"โ Error extracting table as CSV: {str(e)}")
return []
async def download_pdfs_direct(page, url: str, config: dict, source: str, start_date: str = None, end_date: str = None) -> List[dict]:
"""
Approach 1: Direct PDF discovery on listing pages
"""
try:
# Check if navigation is configured
navigation_selector = config.get("navigation_selector")
navigation_url_addition = config.get("navigation_url_addition")
start_page = config.get("start_page", 1)
all_pdfs = []
seen_pdf_urls = set() # Track unique PDF URLs to detect duplicates
current_page = start_page
consecutive_empty_pages = 0
max_consecutive_empty = 2 # Stop after 2 consecutive pages with no new content
# Navigate to the initial page
await page.goto(url, wait_until="domcontentloaded", timeout=30000)
# Handle pagination if configured
if navigation_selector and navigation_url_addition:
logger.info(f"๐งญ Navigation configured: selector={navigation_selector}, url_addition={navigation_url_addition}")
logger.info(f"๐ Starting from page: {start_page}")
while True:
logger.info(f"๐ Processing page {current_page}")
# Check MAX_PAGE_LIMIT if set
if MAX_PAGE_LIMIT is not None and current_page > MAX_PAGE_LIMIT:
logger.info(f"๐ Reached MAX_PAGE_LIMIT ({MAX_PAGE_LIMIT}), stopping pagination")
break
# Navigate to current page if not the first page
if current_page > start_page:
nav_url_addition = navigation_url_addition.replace("{page_no}", str(current_page))
nav_url = construct_navigation_url(url, nav_url_addition)
logger.info(f"๐งญ Navigating to: {nav_url}")
await page.goto(nav_url, wait_until="domcontentloaded", timeout=30000)
# Check for recaptcha and wait if present
captcha_result = await check_and_wait_for_recaptcha(page, config)
if captcha_result == "CAPTCHA_TIMEOUT":
logger.error("โ Captcha detected but not solved within timeout period")
return []
# Check if navigation element exists for next page
nav_element = await page.query_selector(navigation_selector)
if current_page == start_page and nav_element:
logger.info("โ
Navigation element found, more pages available")
elif current_page > start_page and not nav_element:
logger.info("๐ No more navigation elements found, stopping pagination")
break
# Check global PDF limit before processing page
if is_pdf_limit_reached():
logger.info(f"๐ Global PDF limit reached ({MAX_PDF_LIMIT}), stopping pagination")
break
# Extract PDFs from current page
page_pdfs = await extract_pdfs_from_current_page(page, config, source, start_date, end_date)
if page_pdfs:
# Check for new (non-duplicate) PDFs
new_pdfs = []
for pdf in page_pdfs:
pdf_url = pdf.get("url", "")
if pdf_url and pdf_url not in seen_pdf_urls:
seen_pdf_urls.add(pdf_url)
new_pdfs.append(pdf)
if new_pdfs:
all_pdfs.extend(new_pdfs)
consecutive_empty_pages = 0 # Reset counter
logger.info(f"๐ Found {len(new_pdfs)} new PDFs on page {current_page} (total: {len(page_pdfs)} PDFs on page)")
else:
consecutive_empty_pages += 1
logger.info(f"๐ No new PDFs found on page {current_page} (all {len(page_pdfs)} PDFs were duplicates)")
# Stop if we've had too many consecutive pages with no new content
if consecutive_empty_pages >= max_consecutive_empty:
logger.info(f"๐ Stopping pagination: {consecutive_empty_pages} consecutive pages with no new content")
break
else:
consecutive_empty_pages += 1
logger.info(f"๐ No PDFs found on page {current_page}")
# Stop if we've had too many consecutive pages with no content
if consecutive_empty_pages >= max_consecutive_empty:
logger.info(f"๐ Stopping pagination: {consecutive_empty_pages} consecutive pages with no content")
break
current_page += 1
else:
# No pagination configured, scrape single page only
logger.info("๐ No navigation configured - scraping single page only")
page_pdfs = await extract_pdfs_from_current_page(page, config, source, start_date, end_date)
all_pdfs.extend(page_pdfs)
logger.info(f"๐ Total unique PDFs found across all pages: {len(all_pdfs)}")
return all_pdfs
except Exception as e:
logger.error(f"โ Error in direct PDF discovery: {str(e)}")
return []
async def download_pdfs_via_page_links(page, url: str, config: dict, source: str, start_date: str = None, end_date: str = None) -> List[dict]:
"""
Approach 2: Page links first, then PDF discovery
1. Go through pagination to collect all page links
2. Visit each individual page link
3. Find and download PDFs from each page
"""
try:
logger.info("๐ Starting Approach 2: Page links first, then PDF discovery")
# Step 1: Collect all page links through pagination
logger.info("๐ Step 1: Collecting all page links through pagination")
all_page_links = await collect_all_page_links(page, url, config, source)
if not all_page_links:
logger.warning("โ ๏ธ No page links found")
return []
logger.info(f"๐ Collected {len(all_page_links)} page links")
# Step 2: Visit each page link and extract PDFs
logger.info("๐ Step 2: Visiting individual pages to find PDFs")
all_pdfs = []
seen_pdf_urls = set()
for i, page_url in enumerate(all_page_links, 1):
if scraping_cancelled():
logger.info("๐ Scraping cancelled, stopping PDF downloads")
break
# Check global PDF limit before processing page
if is_pdf_limit_reached():
logger.info(f"๐ Global PDF limit reached ({MAX_PDF_LIMIT}), stopping page processing")
break
logger.info(f"๐ Processing page {i}/{len(all_page_links)}: {page_url}")
logger.info(f"๐ Global PDF count: {get_global_pdf_count()}/{MAX_PDF_LIMIT}")
try:
# Navigate to the individual page
await page.goto(page_url, wait_until="domcontentloaded", timeout=30000)
# Check for recaptcha and wait if present
captcha_result = await check_and_wait_for_recaptcha(page, config)
if captcha_result == "CAPTCHA_TIMEOUT":
logger.error("โ Captcha detected but not solved within timeout period")
return [{
"title": "CAPTCHA_ERROR",
"content": "Captcha detected. Please try again later. The website requires captcha verification which could not be completed automatically.",
"date": datetime.now().strftime("%Y-%m-%d"),
"url": page_url
}]
# Extract title from this individual page using title selector (for Approach 2)
page_title = ""
# For MOPND, use the cached title from the listing page
if source == "mopnd":
# Try exact match first
if page_url in mopnd_article_titles:
page_title = mopnd_article_titles[page_url]
logger.info(f"๐ Using MOPND cached title from listing page: {page_title}")
else:
# Try to find a matching URL (handle query params, trailing slashes)
page_url_parsed = urlparse(page_url)
page_url_normalized = urlunparse((page_url_parsed.scheme, page_url_parsed.netloc, page_url_parsed.path, '', '', ''))
# Try normalized URL
matching_url = None
for cached_url in mopnd_article_titles.keys():
cached_parsed = urlparse(cached_url)
cached_normalized = urlunparse((cached_parsed.scheme, cached_parsed.netloc, cached_parsed.path, '', '', ''))
if cached_normalized == page_url_normalized:
matching_url = cached_url
break
if matching_url:
page_title = mopnd_article_titles[matching_url]
logger.info(f"๐ Using MOPND cached title (matched normalized URL): {page_title}")
else:
logger.warning(f"โ ๏ธ MOPND title not found in cache for URL: {page_url}")
logger.debug(f"๐ Available URLs in cache: {list(mopnd_article_titles.keys())[:3]}")
else:
# For other sites, extract title from individual page
title_selector = config.get("title")
if title_selector:
try:
title_element = await page.query_selector(title_selector)
if title_element:
page_title = await title_element.text_content()
if page_title:
page_title = page_title.strip()
logger.info(f"๐ Extracted title from page: {page_title}")
else:
logger.debug(f"โ ๏ธ Title element found but no text content")
else:
logger.debug(f"โ ๏ธ Title element not found with selector: {title_selector}")
except Exception as e:
logger.warning(f"โ ๏ธ Error extracting title from page: {str(e)}")
# Extract PDFs from this page, using page title for PDFs (Approach 2 behavior)
page_pdfs = await extract_pdfs_from_current_page(page, config, source, start_date, end_date, use_page_title_for_pdfs=True, page_title=page_title)
if page_pdfs:
# Check for new (non-duplicate) PDFs
new_pdfs = []
for pdf in page_pdfs:
pdf_url = pdf.get("url", "")
if pdf_url and pdf_url not in seen_pdf_urls:
seen_pdf_urls.add(pdf_url)
new_pdfs.append(pdf)
if new_pdfs:
all_pdfs.extend(new_pdfs)
logger.info(f"๐ Found {len(new_pdfs)} new PDFs on page {i} (total: {len(page_pdfs)} PDFs on page)")
else:
logger.info(f"๐ No new PDFs found on page {i} (all {len(page_pdfs)} PDFs were duplicates)")
else:
logger.info(f"๐ No PDFs found on page {i}")
except Exception as e:
logger.error(f"โ Error processing page {i} ({page_url}): {str(e)}")
continue
logger.info(f"๐ Total unique PDFs found across all pages: {len(all_pdfs)}")
# Debug: Log the structure of returned PDFs
if all_pdfs:
logger.info(f"๐ Sample PDF structure: {all_pdfs[0]}")
else:
logger.warning("โ ๏ธ No PDFs found - this might be the issue")
return all_pdfs
except Exception as e:
logger.error(f"โ Error in page-links-first approach: {str(e)}")
return []
async def check_and_wait_for_recaptcha(page, config: dict) -> bool:
"""
Check if recaptcha is present on the page and wait for user to solve it
Returns:
True if recaptcha was detected and handled, False otherwise
"""
from scraper_common import set_captcha_status, clear_captcha_status
recaptcha_text = config.get("recaptcha_text")
if not recaptcha_text:
return False
try:
# Check if recaptcha text appears on the page
page_content = await page.content()
if recaptcha_text.lower() in page_content.lower():
logger.warning(f"๐ก๏ธ Recaptcha detected on page: {recaptcha_text}")
logger.info("โณ Waiting for user to solve recaptcha (max 60 seconds)...")
logger.info("๐ก Please solve the recaptcha in the browser window")
# Set captcha status for UI
set_captcha_status("๐ก๏ธ Captcha detected! Please complete the captcha challenge in the browser window. Waiting for you to solve it...")
# Wait for recaptcha to disappear (text should no longer be on page)
max_wait_time = 60 # seconds
wait_interval = 2 # check every 2 seconds
waited_time = 0
while waited_time < max_wait_time:
await asyncio.sleep(wait_interval)
waited_time += wait_interval
# Update status message with remaining time
remaining_time = max_wait_time - waited_time
set_captcha_status(f"๐ก๏ธ Captcha detected! Please complete the captcha challenge in the browser window. Time remaining: {remaining_time}s...")
# Check if recaptcha text is still present
current_content = await page.content()
if recaptcha_text.lower() not in current_content.lower():
logger.info("โ
Recaptcha appears to be solved, continuing...")
# Clear captcha status
clear_captcha_status()
# Wait a bit more for page to fully load after recaptcha
await asyncio.sleep(2)
return True
logger.debug(f"โณ Still waiting for recaptcha to be solved... ({waited_time}/{max_wait_time}s)")
logger.warning(f"โ ๏ธ Recaptcha wait timeout ({max_wait_time}s). Continuing anyway...")
# Clear captcha status
clear_captcha_status()
# Return a special value to indicate captcha timeout
return "CAPTCHA_TIMEOUT"
else:
# No captcha detected, clear any previous status
clear_captcha_status()
except Exception as e:
logger.warning(f"โ ๏ธ Error checking for recaptcha: {str(e)}")
clear_captcha_status()
return False
return False
async def collect_all_page_links(page, url: str, config: dict, source: str) -> List[str]:
"""
Collect all page links through pagination
"""
try:
logger.info("๐ Starting page link collection through pagination")
# Check if navigation is configured
navigation_selector = config.get("navigation_selector")
navigation_url_addition = config.get("navigation_url_addition")
start_page = config.get("start_page", 1)
page_links_selector = config.get("page_links")
if not page_links_selector:
logger.error("โ No page_links selector configured")
return []
all_page_links = []
seen_page_urls = set() # Track unique page URLs to detect duplicates
current_page = start_page
consecutive_empty_pages = 0
max_consecutive_empty = 2 # Stop after 2 consecutive pages with no new content
# Navigate to the initial page
await page.goto(url, wait_until="domcontentloaded", timeout=30000)
# Check for recaptcha and wait if present
captcha_result = await check_and_wait_for_recaptcha(page, config)
if captcha_result == "CAPTCHA_TIMEOUT":
logger.error("โ Captcha detected but not solved within timeout period")
return [{
"title": "CAPTCHA_ERROR",
"content": "Captcha detected. Please try again later. The website requires captcha verification which could not be completed automatically.",
"date": datetime.now().strftime("%Y-%m-%d"),
"url": url
}]
# Handle pagination if configured
if navigation_selector and navigation_url_addition:
logger.info(f"๐งญ Navigation configured: selector={navigation_selector}, url_addition={navigation_url_addition}")
logger.info(f"๐ Starting from page: {start_page}")
while True:
logger.info(f"๐ Collecting page links from page {current_page}")
# Check MAX_PAGE_LIMIT if set
if MAX_PAGE_LIMIT is not None and current_page > MAX_PAGE_LIMIT:
logger.info(f"๐ Reached MAX_PAGE_LIMIT ({MAX_PAGE_LIMIT}), stopping pagination")
break
# Navigate to current page if not the first page
if current_page > start_page:
nav_url_addition = navigation_url_addition.replace("{page_no}", str(current_page))
nav_url = construct_navigation_url(url, nav_url_addition)
logger.info(f"๐งญ Navigating to: {nav_url}")
await page.goto(nav_url, wait_until="domcontentloaded", timeout=30000)
# Check for recaptcha and wait if present
captcha_result = await check_and_wait_for_recaptcha(page, config)
if captcha_result == "CAPTCHA_TIMEOUT":
logger.error("โ Captcha detected but not solved within timeout period")
return []
# Check if navigation element exists for next page
nav_element = await page.query_selector(navigation_selector)
if current_page == start_page and nav_element:
logger.info("โ
Navigation element found, more pages available")
elif current_page > start_page and not nav_element:
logger.info("๐ No more navigation elements found, stopping pagination")
break
# Extract page links from current page
# Use MOPND-specific function if this is MOPND
if source == "mopnd":
page_links = await extract_mopnd_page_links_with_dates(page, config)
else:
page_links = await extract_page_links_from_current_page(page, config)
if page_links:
# Check for new (non-duplicate) page links
new_page_links = []
for page_link in page_links:
if page_link and page_link not in seen_page_urls:
seen_page_urls.add(page_link)
new_page_links.append(page_link)
if new_page_links:
all_page_links.extend(new_page_links)
consecutive_empty_pages = 0 # Reset counter
logger.info(f"๐ Found {len(new_page_links)} new page links on page {current_page} (total: {len(page_links)} page links on page)")
else:
consecutive_empty_pages += 1
logger.info(f"๐ No new page links found on page {current_page} (all {len(page_links)} page links were duplicates)")
# Stop if we've had too many consecutive pages with no new content
if consecutive_empty_pages >= max_consecutive_empty:
logger.info(f"๐ Stopping pagination: {consecutive_empty_pages} consecutive pages with no new content")
break
else:
consecutive_empty_pages += 1
logger.info(f"๐ No page links found on page {current_page}")
# Stop if we've had too many consecutive pages with no content
if consecutive_empty_pages >= max_consecutive_empty:
logger.info(f"๐ Stopping pagination: {consecutive_empty_pages} consecutive pages with no content")
break
current_page += 1
else:
# No pagination configured, scrape single page only
logger.info("๐ No navigation configured - collecting page links from single page only")
# Use MOPND-specific function if this is MOPND
if source == "mopnd":
page_links = await extract_mopnd_page_links_with_dates(page, config)
else:
page_links = await extract_page_links_from_current_page(page, config)
all_page_links.extend(page_links)
logger.info(f"๐ Total unique page links collected: {len(all_page_links)}")
return all_page_links
except Exception as e:
logger.error(f"โ Error collecting page links: {str(e)}")
return []
async def extract_page_links_from_current_page(page, config: dict) -> List[str]:
"""
Extract page links from the current page
"""
try:
# Get page links from the page
page_links = []
page_links_selector = config.get("page_links")
if isinstance(page_links_selector, list):
for selector in page_links_selector:
logger.info(f"๐ Looking for page links with selector: {selector}")
elements = await page.query_selector_all(selector)
logger.info(f"๐ฐ Found {len(elements)} elements with selector: {selector}")
for element in elements:
href = await element.get_attribute("href")
if href:
absolute_url = convert_to_absolute_url(href, page.url)
page_links.append(absolute_url)
else:
# If the element itself doesn't have href, look for a link within it or its parent
# First, try to find an tag within the element
link_element = await element.query_selector("a")
if link_element:
href = await link_element.get_attribute("href")
if href:
absolute_url = convert_to_absolute_url(href, page.url)
page_links.append(absolute_url)
continue
# If no link found within, try to find in parent element
try:
parent = await element.evaluate_handle("el => el.parentElement")
if parent:
parent_link = await parent.query_selector("a")
if parent_link:
href = await parent_link.get_attribute("href")
if href:
absolute_url = convert_to_absolute_url(href, page.url)
page_links.append(absolute_url)
except Exception as e:
logger.debug(f"โ ๏ธ Could not find link in parent: {str(e)}")
elif isinstance(page_links_selector, str):
logger.info(f"๐ Looking for page links with selector: {page_links_selector}")
elements = await page.query_selector_all(page_links_selector)
logger.info(f"๐ฐ Found {len(elements)} elements with selector: {page_links_selector}")
for element in elements:
href = await element.get_attribute("href")
if href:
absolute_url = convert_to_absolute_url(href, page.url)
page_links.append(absolute_url)
else:
# If the element itself doesn't have href, look for a link within it or its parent
# First, try to find an tag within the element
link_element = await element.query_selector("a")
if link_element:
href = await link_element.get_attribute("href")
if href:
absolute_url = convert_to_absolute_url(href, page.url)
page_links.append(absolute_url)
continue
# If no link found within, try to find in parent element
try:
parent = await element.evaluate_handle("el => el.parentElement")
if parent:
parent_link = await parent.query_selector("a")
if parent_link:
href = await parent_link.get_attribute("href")
if href:
absolute_url = convert_to_absolute_url(href, page.url)
page_links.append(absolute_url)
except Exception as e:
logger.debug(f"โ ๏ธ Could not find link in parent: {str(e)}")
logger.info(f"๐ Found {len(page_links)} page links on current page")
return page_links
except Exception as e:
logger.error(f"โ Error extracting page links from current page: {str(e)}")
return []
async def extract_mopnd_page_links_with_dates(page, config: dict) -> List[str]:
"""
Extract MOPND page links with dates and titles (special handling for MOPND)
"""
try:
logger.info("๐ Extracting MOPND page links with dates and titles")
# Get page link selector
page_links_selector = config.get("page_links")
if not page_links_selector:
logger.warning("โ ๏ธ No page_links selector found in config")
return []
# Get date selector
date_selector = config.get("date")
if not date_selector:
logger.warning("โ ๏ธ No date selector found in config")
return []
# Get title selector
title_selector = config.get("title")
if not title_selector:
logger.warning("โ ๏ธ No title selector found in config")
return []
# Get all page link elements
logger.info(f"๐ Looking for page links with selector: {page_links_selector}")
link_elements = await page.query_selector_all(page_links_selector)
logger.info(f"๐ฐ Found {len(link_elements)} page link elements")
# Get all date elements
logger.info(f"๐ Looking for dates with selector: {date_selector}")
date_elements = await page.query_selector_all(date_selector)
logger.info(f"๐
Found {len(date_elements)} date elements")
# Note: For MOPND, title is extracted from link text itself since title selector is same as page_links
# Extract links, dates, and titles
page_links = []
for i, link_element in enumerate(link_elements):
try:
# Get the href attribute
href = await link_element.get_attribute("href")
if href:
# Convert to absolute URL
absolute_url = convert_to_absolute_url(href, page.url)
page_links.append(absolute_url)
# Extract title from the link text itself (since title selector is same as page_links)
try:
title_text = await link_element.text_content()
if title_text and title_text.strip():
# Store the title for this page URL
mopnd_article_titles[absolute_url] = title_text.strip()
logger.debug(f"โ
Stored title for {absolute_url}: {title_text.strip()}")
except Exception as e:
logger.debug(f"โ ๏ธ Could not extract title from link {i}: {str(e)}")
# Try to get corresponding date
# First try by index (assuming same order)
date_found = False
if i < len(date_elements):
try:
date_text = await date_elements[i].text_content()
if date_text and date_text.strip():
# Store the date for this page URL
mopnd_article_dates[absolute_url] = date_text.strip()
logger.debug(f"โ
Stored date for {absolute_url}: {date_text.strip()}")
date_found = True
except Exception as e:
logger.debug(f"โ ๏ธ Could not extract date for link {i}: {str(e)}")
# If date not found by index, try to find it in the same parent container
if not date_found:
try:
# Get the parent element of the link (look for common container classes)
parent = await link_element.evaluate_handle("el => el.closest('.post_info, .post, [class*=\"post\"], [class*=\"item\"], [class*=\"entry\"]')")
if parent:
# Try to find date element within the same parent
date_in_parent = await parent.query_selector(date_selector)
if date_in_parent:
date_text = await date_in_parent.text_content()
if date_text and date_text.strip():
mopnd_article_dates[absolute_url] = date_text.strip()
logger.debug(f"โ
Stored date from parent container for {absolute_url}: {date_text.strip()}")
date_found = True
except Exception as e:
logger.debug(f"โ ๏ธ Could not find date in parent container: {str(e)}")
if not date_found:
logger.warning(f"โ ๏ธ Could not extract date for link {i} ({absolute_url})")
except Exception as e:
logger.warning(f"โ Error extracting link {i}: {str(e)}")
continue
logger.info(f"๐ Found {len(page_links)} MOPND page links with dates and titles")
logger.info(f"๐ Stored {len(mopnd_article_titles)} titles and {len(mopnd_article_dates)} dates")
# Debug: Show first few stored titles and dates
if mopnd_article_titles:
sample_titles = list(mopnd_article_titles.items())[:3]
logger.debug(f"๐ Sample titles: {sample_titles}")
if mopnd_article_dates:
sample_dates = list(mopnd_article_dates.items())[:3]
logger.debug(f"๐ Sample dates: {sample_dates}")
return page_links
except Exception as e:
logger.error(f"โ Error extracting MOPND page links: {str(e)}")
return []
async def _extract_nbs_pdfs_grouped_by_title(page, config: dict, source: str, start_date: str = None, end_date: str = None) -> List[dict]:
"""
Special NBS handler: Multiple titles on one page, each title can have multiple PDFs
Approach 1: Extract all titles and PDFs, then group PDFs sequentially by title
"""
try:
logger.info(f"๐ท NBS special handling (Approach 1): Processing multiple titles with grouped PDFs")
# Extract all titles from the page in order
title_selector = config.get("title")
titles = []
if title_selector:
try:
title_elements = await page.query_selector_all(title_selector)
for element in title_elements:
try:
title_text = await element.text_content()
if title_text:
title_text = title_text.strip()
titles.append(title_text)
logger.debug(f"๐ Found title: {title_text}")
except Exception as e:
logger.debug(f"โ ๏ธ Could not extract title text: {str(e)}")
except Exception as e:
logger.warning(f"โ ๏ธ Error extracting titles: {str(e)}")
if not titles:
logger.warning("โ ๏ธ No titles found on NBS page, falling back to standard processing")
return []
logger.info(f"๐ Found {len(titles)} titles on page")
# Extract all PDF links in order
pdf_selector = config.get("pdf_links")
all_pdf_links = []
if isinstance(pdf_selector, list):
for selector in pdf_selector:
try:
elements = await page.query_selector_all(selector)
for element in elements:
href = await element.get_attribute("href")
if href:
absolute_url = convert_to_absolute_url(href, page.url)
try:
link_text = await element.text_content()
pdf_name = link_text.strip() if link_text else ""
except:
pdf_name = ""
if not pdf_name:
url_path = urlparse(absolute_url).path
if url_path:
pdf_name = unquote(os.path.basename(url_path))
if pdf_name.lower().endswith('.pdf'):
pdf_name = pdf_name[:-4]
# Skip PDFs with "Read More" as the name (not actual PDF names)
if pdf_name and pdf_name.strip().lower() == "read more":
logger.debug(f"โญ๏ธ Skipping PDF with 'Read More' name: {absolute_url}")
continue
all_pdf_links.append({
"url": absolute_url,
"name": pdf_name
})
except Exception as e:
logger.debug(f"โ ๏ธ Error with PDF selector '{selector}': {str(e)}")
elif isinstance(pdf_selector, str):
try:
elements = await page.query_selector_all(pdf_selector)
for element in elements:
href = await element.get_attribute("href")
if href:
absolute_url = convert_to_absolute_url(href, page.url)
try:
link_text = await element.text_content()
pdf_name = link_text.strip() if link_text else ""
except:
pdf_name = ""
if not pdf_name:
url_path = urlparse(absolute_url).path
if url_path:
pdf_name = unquote(os.path.basename(url_path))
if pdf_name.lower().endswith('.pdf'):
pdf_name = pdf_name[:-4]
# Skip PDFs with "Read More" as the name (not actual PDF names)
if pdf_name and pdf_name.strip().lower() == "read more":
logger.debug(f"โญ๏ธ Skipping PDF with 'Read More' name: {absolute_url}")
continue
all_pdf_links.append({
"url": absolute_url,
"name": pdf_name
})
except Exception as e:
logger.warning(f"โ ๏ธ Error extracting PDF elements: {str(e)}")
logger.info(f"๐ Found {len(all_pdf_links)} PDF links on page")
if not all_pdf_links:
logger.warning("โ ๏ธ No PDF links found on NBS page")
return []
# Group PDFs by title: Divide PDFs evenly among titles, or use sequential matching
# Simple approach: Divide PDFs evenly among titles
pdfs_per_title = len(all_pdf_links) // len(titles) if len(titles) > 0 else 0
remainder = len(all_pdf_links) % len(titles)
title_pdf_groups = []
pdf_index = 0
for i, title in enumerate(titles):
# Calculate how many PDFs this title gets
num_pdfs = pdfs_per_title + (1 if i < remainder else 0)
# Get PDFs for this title
title_pdfs = all_pdf_links[pdf_index:pdf_index + num_pdfs]
pdf_index += num_pdfs
if title_pdfs:
title_pdf_groups.append({
"title": title,
"pdfs": title_pdfs
})
logger.info(f"๐ Title '{title}': {len(title_pdfs)} associated PDFs")
if not title_pdf_groups:
logger.warning("โ ๏ธ No title-PDF groups created")
return []
# Extract dates from page
date_selector = config.get("date")
date_elements = []
if date_selector:
try:
date_elements = await page.query_selector_all(date_selector)
except Exception as e:
logger.debug(f"โ ๏ธ Could not extract date elements: {str(e)}")
# Process each title group: Try all PDFs, if some work, create document
all_documents = []
for group_idx, group in enumerate(title_pdf_groups):
if scraping_cancelled():
logger.info("๐ Scraping cancelled, stopping NBS processing")
break
if is_pdf_limit_reached():
logger.info(f"๐ Global PDF limit reached ({MAX_PDF_LIMIT}), stopping NBS processing")
break
title = group["title"]
pdf_list = group["pdfs"]
logger.info(f"๐ท Processing title {group_idx+1}/{len(title_pdf_groups)}: '{title}' ({len(pdf_list)} PDFs)")
# Try all PDFs for this title
successful_pdfs = []
combined_text_parts = []
all_pdf_paths = []
total_size = 0
for pdf_idx, pdf_info in enumerate(pdf_list):
if scraping_cancelled():
break
if is_pdf_limit_reached():
break
pdf_url = pdf_info["url"]
pdf_link_name = pdf_info.get("name", "") or f"PDF {pdf_idx+1}"
# Skip PDFs with "Read More" as the name (not actual PDF names)
if pdf_link_name and pdf_link_name.strip().lower() == "read more":
logger.info(f" โญ๏ธ Skipping PDF with 'Read More' name: {pdf_url}")
continue
logger.info(f" โฌ๏ธ Trying PDF {pdf_idx+1}/{len(pdf_list)}: {pdf_link_name}")
try:
download_result = download_and_save_pdf(pdf_url, source)
if download_result["success"]:
local_pdf_path = download_result["path"]
extracted_text = extract_text_from_pdf_file(local_pdf_path)
if extracted_text and len(extracted_text.strip()) > 10:
current_count = increment_global_pdf_count()
successful_pdfs.append({
"url": pdf_url,
"path": local_pdf_path,
"name": pdf_link_name,
"size": download_result["size"],
"text": extracted_text
})
combined_text_parts.append(f"=== {pdf_link_name} ===\n{extracted_text}")
all_pdf_paths.append(local_pdf_path)
total_size += download_result["size"]
logger.info(f" โ
Successfully processed PDF '{pdf_link_name}' (Global: {current_count}/{MAX_PDF_LIMIT})")
else:
logger.warning(f" โ ๏ธ PDF downloaded but no text extracted: {pdf_link_name}")
else:
logger.warning(f" โ Failed to download PDF: {download_result.get('message', 'Unknown error')}")
except Exception as e:
logger.error(f" โ Error processing PDF: {str(e)}")
continue
# Create document if at least one PDF succeeded (Approach 1: if some work, get PDF)
if successful_pdfs:
# Extract date (use first date element or group index if multiple dates)
pdf_date_raw = ""
if date_elements:
date_idx = min(group_idx, len(date_elements) - 1)
try:
date_text = await date_elements[date_idx].text_content()
if date_text:
pdf_date_raw = date_text.strip()
except:
pass
# Standardize the date to YYYY-MM-DD format
pdf_date = standardize_date(pdf_date_raw, default_to_current=True)
if not pdf_date:
pdf_date = datetime.now().strftime("%Y-%m-%d")
# Check date range filtering
if start_date or end_date:
start_dt = parse_date_input(start_date) if start_date else None
end_dt = parse_date_input(end_date) if end_date else None
if not is_date_in_range(pdf_date, start_dt, end_dt, include_missing=False):
logger.info(f"๐
Title date {pdf_date} is outside date range - skipping")
continue
# Combine all PDF texts
combined_text = "\n\n".join(combined_text_parts)
primary_path = all_pdf_paths[0] if all_pdf_paths else ""
all_documents.append({
"url": successful_pdfs[0]["url"],
"local_path": primary_path,
"size": total_size,
"title": title,
"source": source,
"extracted_text": combined_text,
"file_type": "PDF",
"date": pdf_date,
"nbs_pdf_count": len(successful_pdfs),
"nbs_all_paths": all_pdf_paths
})
logger.info(f"โ
Created document for title '{title}' with {len(successful_pdfs)}/{len(pdf_list)} successful PDFs")
else:
logger.warning(f"โ ๏ธ No PDFs successfully processed for title: '{title}' - moving forward")
logger.info(f"๐ NBS Processing Summary: {len(all_documents)} documents created from {len(title_pdf_groups)} titles")
return all_documents
except Exception as e:
logger.error(f"โ Error in NBS PDF extraction: {str(e)}")
return []
async def extract_pdfs_from_current_page(page, config: dict, source: str, start_date: str = None, end_date: str = None, use_page_title_for_pdfs: bool = False, page_title: str = None) -> List[dict]:
"""
Extract PDFs from the current page
Special handling for NBS: Multiple titles on one page, each title can have multiple PDFs
Args:
page: Playwright page object
config: Website configuration dict
source: Source name
start_date: Optional start date for filtering
end_date: Optional end date for filtering
use_page_title_for_pdfs: If True, use page title for PDFs (Approach 2 behavior)
page_title: Pre-extracted page title (optional, will extract if not provided and use_page_title_for_pdfs is True)
"""
try:
# Special handling for NBS: Group PDFs by title
is_nbs = source.lower() in ["nbs", "nbs somalia"]
if is_nbs:
return await _extract_nbs_pdfs_grouped_by_title(page, config, source, start_date, end_date)
# Standard handling for other sources: Each PDF/file gets its own document
# Get PDF links from the page (with link text for name extraction)
pdf_links = []
pdf_selector = config.get("pdf_links")
if isinstance(pdf_selector, list):
for selector in pdf_selector:
elements = await page.query_selector_all(selector)
for element in elements:
# Try href first, then button-url (for FEWS custom elements)
href = await element.get_attribute("href")
if not href:
href = await element.get_attribute("button-url")
if href:
absolute_url = convert_to_absolute_url(href, page.url)
# Extract link text for PDF name
try:
link_text = await element.text_content()
pdf_name = link_text.strip() if link_text else ""
except Exception as e:
logger.debug(f"โ ๏ธ Could not extract link text: {str(e)}")
pdf_name = ""
# If no link text, try to extract filename from URL
if not pdf_name:
url_path = urlparse(absolute_url).path
if url_path:
pdf_name = unquote(os.path.basename(url_path))
# Remove .pdf extension if present (we'll add it back if needed)
if pdf_name.lower().endswith('.pdf'):
pdf_name = pdf_name[:-4]
pdf_links.append({
"url": absolute_url,
"name": pdf_name,
"file_type": "PDF"
})
elif isinstance(pdf_selector, str):
elements = await page.query_selector_all(pdf_selector)
for element in elements:
# Try href first, then button-url (for FEWS custom elements)
href = await element.get_attribute("href")
if not href:
href = await element.get_attribute("button-url")
if href:
absolute_url = convert_to_absolute_url(href, page.url)
# Extract link text for PDF name
try:
link_text = await element.text_content()
pdf_name = link_text.strip() if link_text else ""
except Exception as e:
logger.debug(f"โ ๏ธ Could not extract link text: {str(e)}")
pdf_name = ""
# If no link text, try to extract filename from URL
if not pdf_name:
from urllib.parse import unquote
url_path = urlparse(absolute_url).path
if url_path:
pdf_name = unquote(os.path.basename(url_path))
# Remove .pdf extension if present (we'll add it back if needed)
if pdf_name.lower().endswith('.pdf'):
pdf_name = pdf_name[:-4]
pdf_links.append({
"url": absolute_url,
"name": pdf_name,
"file_type": "PDF"
})
# Get file links (CSV, etc.) from the page if configured
file_links = []
file_selector = config.get("file_links")
if file_selector:
# Determine file type from URL or config
file_type = "CSV" # Default to CSV
if isinstance(file_selector, list):
for selector in file_selector:
elements = await page.query_selector_all(selector)
for element in elements:
href = await element.get_attribute("href")
if href:
absolute_url = convert_to_absolute_url(href, page.url)
# Determine file type from URL
if absolute_url.lower().endswith('.csv'):
file_type = "CSV"
elif absolute_url.lower().endswith(('.xlsx', '.xls')):
file_type = "XLSX"
elif absolute_url.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp')):
file_type = "PNG" # Image files
else:
file_type = "CSV" # Default
# Extract link text for file name
try:
link_text = await element.text_content()
file_name = link_text.strip() if link_text else ""
except Exception as e:
logger.debug(f"โ ๏ธ Could not extract link text: {str(e)}")
file_name = ""
# If no link text, try to extract filename from URL
if not file_name:
url_path = urlparse(absolute_url).path
if url_path:
file_name = unquote(os.path.basename(url_path))
# Remove file extension if present
for ext in ['.csv', '.xlsx', '.xls', '.png', '.jpg', '.jpeg', '.gif', '.webp']:
if file_name.lower().endswith(ext):
file_name = file_name[:-len(ext)]
break
file_links.append({
"url": absolute_url,
"name": file_name,
"file_type": file_type
})
elif isinstance(file_selector, str):
elements = await page.query_selector_all(file_selector)
for element in elements:
href = await element.get_attribute("href")
if href:
absolute_url = convert_to_absolute_url(href, page.url)
# Determine file type from URL
if absolute_url.lower().endswith('.csv'):
file_type = "CSV"
elif absolute_url.lower().endswith(('.xlsx', '.xls')):
file_type = "XLSX"
elif absolute_url.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp')):
file_type = "PNG" # Image files
else:
file_type = "CSV" # Default
# Extract link text for file name
try:
link_text = await element.text_content()
file_name = link_text.strip() if link_text else ""
except Exception as e:
logger.debug(f"โ ๏ธ Could not extract link text: {str(e)}")
file_name = ""
# If no link text, try to extract filename from URL
if not file_name:
url_path = urlparse(absolute_url).path
if url_path:
file_name = unquote(os.path.basename(url_path))
# Remove file extension if present
for ext in ['.csv', '.xlsx', '.xls', '.png', '.jpg', '.jpeg', '.gif', '.webp']:
if file_name.lower().endswith(ext):
file_name = file_name[:-len(ext)]
break
file_links.append({
"url": absolute_url,
"name": file_name,
"file_type": file_type
})
# Combine PDF and file links
all_links = pdf_links + file_links
logger.info(f"๐ Found {len(pdf_links)} PDF links and {len(file_links)} file links on current page (total: {len(all_links)})")
# Log CSV files specifically for debugging
csv_files = [link for link in file_links if link.get("file_type") == "CSV"]
if csv_files:
logger.info(f"๐ Found {len(csv_files)} CSV file(s) to process:")
for csv_file in csv_files:
logger.info(f" - CSV: {csv_file.get('name', 'Unknown')} at {csv_file.get('url', 'Unknown URL')}")
# Extract page title using the title selector from config (if not already provided)
if page_title is None:
page_title = ""
title_selector = config.get("title")
if title_selector:
try:
title_element = await page.query_selector(title_selector)
if title_element:
page_title = await title_element.text_content()
if page_title:
page_title = page_title.strip()
logger.info(f"๐ Extracted page title: {page_title}")
else:
logger.debug(f"โ ๏ธ Title element found but no text content")
else:
logger.debug(f"โ ๏ธ Title element not found with selector: {title_selector}")
except Exception as e:
logger.warning(f"โ ๏ธ Error extracting page title: {str(e)}")
elif page_title:
logger.info(f"๐ Using provided page title: {page_title}")
# Try to extract dates from the page for date filtering
date_selector = config.get("date")
date_elements = []
if date_selector:
try:
date_elements = await page.query_selector_all(date_selector)
logger.debug(f"๐
Found {len(date_elements)} date elements on current page")
except Exception as e:
logger.debug(f"โ ๏ธ Could not extract date elements: {str(e)}")
# Download each PDF/file
downloaded_pdfs = []
for i, file_info in enumerate(all_links):
if scraping_cancelled():
logger.info("๐ Scraping cancelled, stopping file downloads")
break
# Check global PDF limit before processing
if is_pdf_limit_reached():
logger.info(f"๐ Global PDF limit reached ({MAX_PDF_LIMIT}), stopping file processing")
break
file_url = file_info["url"]
file_name = file_info.get("name", "") # Individual file name from link text
file_type = file_info.get("file_type", "PDF")
# Determine title priority based on context
if use_page_title_for_pdfs and page_title:
# Approach 2: Use page title for files (when navigating to individual pages)
file_name = page_title
logger.info(f"๐ Using page title for {file_type} (Approach 2): {file_name}")
elif file_name and file_name != "":
# Approach 1: Priority to individual file link text
# Clean up the file name (remove extra whitespace, newlines, etc.)
file_name = " ".join(file_name.split())
logger.info(f"๐ Using {file_type} link text as name: {file_name}")
elif page_title:
# Fallback: Use page title if individual file name is missing
file_name = page_title
logger.info(f"๐ Using page title as fallback for {file_type}: {file_name}")
else:
# Last resort fallback
current_count = get_global_pdf_count() + 1
file_name = f"{file_type} {current_count}"
logger.info(f"๐ Using fallback name: {file_name}")
logger.info(f"โฌ๏ธ Downloading {file_type} {i+1}/{len(all_links)}: {file_url}")
logger.info(f"๐ {file_type} name: {file_name}")
logger.info(f"๐ Global PDF count: {get_global_pdf_count()}/{MAX_PDF_LIMIT}")
try:
# Download based on file type
if file_type == "PDF":
download_result = download_and_save_pdf(file_url, source)
else:
# For CSV and other files
download_result = download_and_save_file(file_url, source, file_type.lower())
if download_result["success"]:
local_file_path = download_result["path"]
extracted_text = ""
# Extract text only for PDFs
if file_type == "PDF":
logger.info(f"๐ Extracting text from local file: {local_file_path}")
extracted_text = extract_text_from_pdf_file(local_file_path)
logger.info(f"๐ Extracted text length: {len(extracted_text)} characters")
if not extracted_text:
logger.warning("โ ๏ธ No text extracted from PDF")
elif file_type == "CSV":
# Special handling for CSV files: read a preview of the content
try:
import csv
logger.info(f"๐ Reading CSV file preview: {local_file_path}")
with open(local_file_path, 'r', encoding='utf-8', errors='ignore') as csv_file:
csv_reader = csv.reader(csv_file)
# Read first 10 rows as preview
preview_rows = []
for idx, row in enumerate(csv_reader):
if idx >= 10:
break
preview_rows.append(row)
# Convert to text preview
if preview_rows:
# Get headers if available
headers = preview_rows[0] if len(preview_rows) > 0 else []
data_rows = preview_rows[1:] if len(preview_rows) > 1 else []
# Extract location from title for icpac_seasonal_forecast
location_info = ""
if source == "icpac_seasonal_forecast" and file_name:
location_info = f"Location: {file_name}\n"
# Create a readable preview
preview_text = f"CSV File: {file_name}\n"
if location_info:
preview_text += location_info
preview_text += f"File Path: {local_file_path}\n"
preview_text += f"Total Rows Previewed: {len(preview_rows)}\n\n"
if headers:
preview_text += "Headers: " + ", ".join(str(h) for h in headers) + "\n\n"
if data_rows:
preview_text += "Sample Data (first few rows):\n"
for row in data_rows[:5]: # Show first 5 data rows
preview_text += ", ".join(str(cell) for cell in row) + "\n"
extracted_text = preview_text
logger.info(f"๐ CSV preview extracted: {len(extracted_text)} characters")
else:
location_info = ""
if source == "icpac_seasonal_forecast" and file_name:
location_info = f"Location: {file_name}\n"
extracted_text = f"CSV File: {file_name}\n"
if location_info:
extracted_text += location_info
extracted_text += f"File Path: {local_file_path}\n(File is empty or could not be read)"
logger.warning("โ ๏ธ CSV file appears to be empty")
except Exception as e:
logger.warning(f"โ ๏ธ Could not read CSV preview: {str(e)}")
location_info = ""
if source == "icpac_seasonal_forecast" and file_name:
location_info = f"Location: {file_name}\n"
extracted_text = f"CSV File: {file_name}\n"
if location_info:
extracted_text += location_info
extracted_text += f"File Path: {local_file_path}\n(Preview could not be generated: {str(e)})"
elif file_type == "PNG":
# Special handling for PNG files (images) - mention location from title
location_info = ""
if source == "icpac_seasonal_forecast" and file_name:
location_info = f"Location: {file_name}\n"
extracted_text = f"PNG File: {file_name}\n"
if location_info:
extracted_text += location_info
extracted_text += f"File Path: {local_file_path}\n"
extracted_text += "(PNG image file downloaded successfully)"
logger.info(f"๐ PNG file info extracted: {file_name}")
else:
# For other file types (XLSX, etc.)
logger.info(f"๐ {file_type} file downloaded (no text extraction needed)")
extracted_text = f"{file_type} File: {file_name}\nFile Path: {local_file_path}"
# Extract date if available from listing page
file_date_raw = ""
if source == "mopnd":
# For MOPND, use the current page URL (not the PDF URL) to look up the date
current_page_url = page.url
# Try exact match first
if current_page_url in mopnd_article_dates:
file_date_raw = mopnd_article_dates[current_page_url]
logger.debug(f"โ
Using MOPND date from cache (page URL: {current_page_url}): {file_date_raw}")
else:
# Try to find a matching URL (handle query params, trailing slashes)
page_url_parsed = urlparse(current_page_url)
page_url_normalized = urlunparse((page_url_parsed.scheme, page_url_parsed.netloc, page_url_parsed.path, '', '', ''))
# Try normalized URL
matching_url = None
for cached_url in mopnd_article_dates.keys():
cached_parsed = urlparse(cached_url)
cached_normalized = urlunparse((cached_parsed.scheme, cached_parsed.netloc, cached_parsed.path, '', '', ''))
if cached_normalized == page_url_normalized:
matching_url = cached_url
break
if matching_url:
file_date_raw = mopnd_article_dates[matching_url]
logger.debug(f"โ
Using MOPND date from cache (matched normalized URL): {file_date_raw}")
else:
logger.warning(f"โ ๏ธ MOPND date not found in cache for page URL: {current_page_url}")
logger.debug(f"๐ Available page URLs in cache: {list(mopnd_article_dates.keys())[:3]}")
elif i < len(date_elements):
try:
date_text = await date_elements[i].text_content()
if date_text:
file_date_raw = date_text.strip()
logger.debug(f"โ
Extracted raw date from listing page: {file_date_raw}")
except Exception as e:
logger.debug(f"โ ๏ธ Could not extract date for {file_type} {i+1}: {str(e)}")
# Standardize the date to YYYY-MM-DD format
file_date = standardize_date(file_date_raw, default_to_current=True)
if not file_date:
file_date = datetime.now().strftime("%Y-%m-%d")
# Check date range filtering
if start_date or end_date:
start_dt = parse_date_input(start_date) if start_date else None
end_dt = parse_date_input(end_date) if end_date else None
if not is_date_in_range(file_date, start_dt, end_dt, include_missing=False):
logger.info(f"๐
{file_type} date {file_date} is outside date range [{start_date}, {end_date}] - filtering out")
continue
# Increment global PDF counter
current_count = increment_global_pdf_count()
downloaded_pdfs.append({
"url": file_url,
"local_path": local_file_path,
"size": download_result["size"],
"title": file_name, # Use extracted name from link text
"source": source,
"extracted_text": extracted_text,
"file_type": file_type,
"date": file_date
})
logger.info(f"โ
Successfully downloaded and processed {file_type} '{file_name}' (Global: {current_count}/{MAX_PDF_LIMIT})")
else:
logger.warning(f"โ Failed to download {file_type} {i+1}: {download_result['message']}")
except Exception as e:
logger.error(f"โ Error downloading {file_type} {i+1}: {str(e)}")
continue
return downloaded_pdfs
except Exception as e:
logger.error(f"โ Error extracting PDFs from current page: {str(e)}")
return []
async def extract_document_content_unified(page, document_url: str, config: dict, website_type: str = None, pdf_count: int = 0, start_date: str = None, end_date: str = None) -> dict:
"""
Unified function to extract content from a single document (PDF-focused)
With 5 retry attempts for loading documents
"""
try:
# Navigate to document with retry logic (5 attempts)
max_retries = 5
retry_count = 0
page_loaded = False
while retry_count < max_retries and not page_loaded:
try:
retry_count += 1
logger.info(f"๐ Loading document (attempt {retry_count}/{max_retries}): {document_url}")
# Navigate with different strategies based on attempt
if retry_count == 1:
# First attempt: Use domcontentloaded for faster loading
await page.goto(document_url, wait_until="domcontentloaded", timeout=30000)
elif retry_count == 2:
# Second attempt: Use basic loading
await page.goto(document_url, timeout=20000)
elif retry_count == 3:
# Third attempt: Use networkidle
await page.goto(document_url, wait_until="networkidle", timeout=15000)
else:
# Fourth and fifth attempts: Try with shorter timeouts
await page.goto(document_url, timeout=10000)
logger.info(f"โ
Successfully loaded document on attempt {retry_count}")
page_loaded = True
except Exception as e:
logger.warning(f"โ ๏ธ Attempt {retry_count} failed for {document_url}: {str(e)}")
if retry_count >= max_retries:
logger.error(f"โ Failed to load document after {max_retries} attempts: {document_url}")
return {
"title": "Network Error",
"content": f"Failed to access document after {max_retries} attempts: {str(e)}",
"date": datetime.now().strftime("%Y-%m-%d"),
"url": document_url
}
# Wait before retry
await asyncio.sleep(2)
if not page_loaded:
return {
"title": "Network Error",
"content": f"Failed to access document after {max_retries} attempts",
"date": datetime.now().strftime("%Y-%m-%d"),
"url": document_url
}
# Extract title from page using title selector (priority source)
title = ""
title_extracted_from_page = False
# For MOPND, use the title extracted from the main page
if website_type == "mopnd" and document_url in mopnd_article_titles:
title = mopnd_article_titles[document_url]
title_extracted_from_page = True
logger.debug(f"โ
Using MOPND title from main page: {title}")
elif website_type == "mopnd":
logger.warning(f"โ ๏ธ MOPND title not found in cache for URL: {document_url}")
logger.debug(f"๐ Available titles: {list(mopnd_article_titles.keys())[:3]}")
else:
# Regular title extraction for other websites using title selector from config
title_selector = config.get("title")
if title_selector:
try:
title_element = await page.query_selector(title_selector)
if title_element:
title = await title_element.text_content()
if title:
title = title.strip()
title_extracted_from_page = True
logger.info(f"โ
Extracted title from page using selector '{title_selector}': {title}")
else:
logger.debug(f"โ ๏ธ Title element found but no text content with selector: {title_selector}")
else:
logger.debug(f"โ ๏ธ Title element not found with selector: {title_selector}")
except Exception as e:
logger.warning(f"Error extracting title with selector '{title_selector}': {str(e)}")
else:
logger.warning("โ ๏ธ No title selector found in config")
# Use the passed website_type or try to determine it from config
if website_type is None:
for site_type, site_config in WEBSITE_CONFIG.items():
if site_config == config:
website_type = site_type
break
if website_type is None:
website_type = "unknown"
content = ""
pdf_path = ""
# For document-focused sites, check for PDF links
# Dynamically determine if this is a PDF website
pdf_websites = get_pdf_websites()
if website_type in pdf_websites:
pdf_links = []
try:
# Get PDF selectors from config
pdf_links_selector = config.get("pdf_links")
# Initialize elements list
pdf_elements = []
# Handle different formats in config
if isinstance(pdf_links_selector, list):
# Process each selector in the array
logger.info(f"๐ Processing array of {len(pdf_links_selector)} PDF selectors")
for selector in pdf_links_selector:
try:
elements = await page.query_selector_all(selector)
logger.info(f"๐ Found {len(elements)} elements with selector {selector}")
pdf_elements.extend(elements)
except Exception as e:
logger.warning(f"โ Error with selector '{selector}': {str(e)}")
elif isinstance(pdf_links_selector, str):
# Old format with single string selector
logger.info(f"๐ Using string selector: {pdf_links_selector}")
pdf_elements = await page.query_selector_all(pdf_links_selector)
else:
logger.warning("โ ๏ธ No pdf_links selector in config, skipping PDF extraction")
# Extract PDF URLs and names from elements
logger.debug(f"๐ Processing {len(pdf_elements)} PDF elements for {website_type}")
for i, element in enumerate(pdf_elements):
try:
logger.debug(f"๐ Extracting PDF URL from element {i+1}/{len(pdf_elements)}")
# Get the href attribute, or button-url for FEWS custom elements
href = await element.get_attribute("href")
if not href:
href = await element.get_attribute("button-url")
if href:
# Convert relative URLs to absolute URLs
absolute_url = convert_to_absolute_url(href, page.url)
# Extract link text for PDF name
try:
link_text = await element.text_content()
pdf_name = link_text.strip() if link_text else ""
except Exception as e:
logger.debug(f"โ ๏ธ Could not extract link text: {str(e)}")
pdf_name = ""
# If no link text, try to extract filename from URL
if not pdf_name:
from urllib.parse import unquote
url_path = urlparse(absolute_url).path
if url_path:
pdf_name = unquote(os.path.basename(url_path))
# Remove .pdf extension if present
if pdf_name.lower().endswith('.pdf'):
pdf_name = pdf_name[:-4]
pdf_links.append({
"url": absolute_url,
"name": pdf_name
})
logger.info(f"๐ Found PDF URL: {absolute_url}")
if pdf_name:
logger.info(f"๐ PDF name: {pdf_name}")
else:
logger.debug(f"โ ๏ธ No href or button-url attribute found on element {i+1}")
except Exception as e:
logger.warning(f"โ Error extracting PDF URL from element {i+1}: {str(e)}")
continue
except Exception as e:
logger.warning(f"Error extracting PDF links: {str(e)}")
pdf_links = []
if pdf_links:
logger.info(f"๐ Found {len(pdf_links)} PDF links, processing...")
# Process all PDF links (up to limit)
pdf_content_parts = []
for i, pdf_info in enumerate(pdf_links):
if pdf_count >= MAX_PDF_LIMIT and MAX_PDF_LIMIT is not None:
logger.info(f"๐ Reached PDF limit ({MAX_PDF_LIMIT}), stopping PDF processing")
break
# Handle both old format (string) and new format (dict)
if isinstance(pdf_info, dict):
pdf_url = pdf_info["url"]
pdf_name = pdf_info.get("name", "")
else:
# Backward compatibility: if it's still a string
pdf_url = pdf_info
pdf_name = ""
try:
logger.info(f"๐ Processing PDF {i+1}/{len(pdf_links)}: {pdf_url}")
if pdf_name:
logger.info(f"๐ PDF name: {pdf_name}")
# First try to download the PDF to get the local path
download_result = download_and_save_pdf(pdf_url, website_type)
if download_result["success"]:
# Set the PDF path to the local downloaded file
pdf_path = download_result["path"]
logger.info(f"๐ PDF downloaded to: {pdf_path}")
# Now extract text from the downloaded PDF
pdf_content = extract_text_from_pdf_file(pdf_path)
if pdf_content and len(pdf_content.strip()) > 10:
# Use extracted PDF name if available, otherwise use generic label
pdf_label = pdf_name if pdf_name else f"PDF {i+1}"
pdf_content_parts.append(f"{pdf_label} Content:\n{pdf_content}")
logger.info(f"โ
Extracted {len(pdf_content)} characters from {pdf_label}")
# Only use PDF name as title if page title extraction completely failed
# Priority: page title selector > PDF name > PDF content
if pdf_name and not title_extracted_from_page and not title:
title = pdf_name
logger.info(f"๐ Using PDF name as title (page title extraction failed): {title}")
else:
logger.warning(f"โ ๏ธ No content extracted from PDF {i+1}")
else:
logger.warning(f"โ Failed to download PDF {i+1}: {download_result['message']}")
pdf_count += 1
logger.info(f"๐ PDF {pdf_count}/{MAX_PDF_LIMIT} processed")
except Exception as e:
logger.warning(f"โ Error processing PDF {i+1}: {str(e)}")
continue
# Combine all PDF content
if pdf_content_parts:
content = "\n\n".join(pdf_content_parts)
logger.info(f"๐ Combined PDF content: {len(content)} characters total")
# Only extract title from PDF content as absolute last resort
# Priority: page title selector > PDF name > PDF content
if not title_extracted_from_page and not title and content and len(content) > 50:
lines = content.split('\n')[:5]
for line in lines:
if line.strip() and len(line.strip()) > 10 and len(line.strip()) < 100:
title = line.strip()
logger.info(f"๐ Using title extracted from PDF content (page title extraction failed): {title}")
break
else:
logger.warning("โ ๏ธ No PDF content extracted, skipping document")
content = ""
else:
# No PDF links found, skip document
logger.info("๐ No PDF links found, skipping document")
content = ""
# Extract date using configuration selector
date_raw = ""
# For MOPND, use the date extracted from the main page
if website_type == "mopnd" and document_url in mopnd_article_dates:
date_raw = mopnd_article_dates[document_url]
logger.debug(f"โ
Using MOPND date from main page: {date_raw}")
elif website_type == "mopnd":
logger.warning(f"โ ๏ธ MOPND date not found in cache for URL: {document_url}")
logger.debug(f"๐ Available dates: {list(mopnd_article_dates.keys())[:3]}")
else:
# Regular date extraction for other websites
date_selector = config.get("date")
if date_selector:
try:
date_element = await page.query_selector(date_selector)
if date_element:
date_raw = await date_element.text_content()
if date_raw:
date_raw = date_raw.strip()
logger.debug(f"โ
Extracted raw date: {date_raw}")
except Exception as e:
logger.warning(f"Error extracting date with selector {date_selector}: {str(e)}")
# Standardize the date to YYYY-MM-DD format
date = standardize_date(date_raw, default_to_current=True)
if not date:
date = datetime.now().strftime("%Y-%m-%d")
logger.info(f"No date found with config selector, using current date: {date}")
# Check date range filtering
if start_date or end_date:
start_dt = parse_date_input(start_date) if start_date else None
end_dt = parse_date_input(end_date) if end_date else None
if not is_date_in_range(date, start_dt, end_dt, include_missing=False):
logger.info(f"๐
Document date {date} is outside date range [{start_date}, {end_date}] - filtering out")
return None
# Skip documents with no content (for PDF-based sites)
# Dynamically determine if this is a PDF website
pdf_websites = get_pdf_websites()
if website_type in pdf_websites:
if not content or len(content.strip()) < 10:
logger.info(f"๐ Skipping document with no PDF content: {document_url}")
return None
result = {
"title": title or "No title found",
"content": content or "No content found",
"date": date,
"url": document_url
}
# Add PDF path for PDF-based sites
# Dynamically determine if this is a PDF website
pdf_websites = get_pdf_websites()
if website_type in pdf_websites:
if pdf_path:
result["pdf_path"] = pdf_path
logger.info(f"๐ Added PDF path to result: {pdf_path}")
else:
logger.warning("โ ๏ธ No PDF path available for PDF-based site")
return result
except Exception as e:
logger.error(f"Error extracting content from {document_url}: {str(e)}")
return {
"title": "Error",
"content": f"Error extracting content: {str(e)}",
"date": datetime.now().strftime("%Y-%m-%d"),
"url": document_url
}