|
|
from scraper_common import scrape_news_async, get_pdf_websites |
|
|
from datetime import datetime |
|
|
import os |
|
|
import requests |
|
|
from urllib.parse import urlparse |
|
|
|
|
|
def create_archive_folders(source: str, date: str = None) -> dict: |
|
|
""" |
|
|
Create organized archive folder structure for document downloads |
|
|
Returns a dictionary of document type folders: |
|
|
{ |
|
|
'date_folder': date_folder, |
|
|
'pdf_folder': pdf_folder, |
|
|
'doc_folder': doc_folder, |
|
|
'csv_folder': csv_folder |
|
|
} |
|
|
""" |
|
|
if date is None: |
|
|
date = datetime.now().strftime("%Y-%m-%d") |
|
|
|
|
|
|
|
|
archive_folder = "archive" |
|
|
if not os.path.exists(archive_folder): |
|
|
os.makedirs(archive_folder) |
|
|
|
|
|
|
|
|
|
|
|
if source.lower() in ["fs cluster", "fscluster"]: |
|
|
source = "FS Cluster" |
|
|
|
|
|
|
|
|
source_folder = os.path.join(archive_folder, source) |
|
|
if not os.path.exists(source_folder): |
|
|
os.makedirs(source_folder) |
|
|
|
|
|
|
|
|
date_folder = os.path.join(source_folder, date) |
|
|
if not os.path.exists(date_folder): |
|
|
os.makedirs(date_folder) |
|
|
|
|
|
|
|
|
pdf_folder = os.path.join(date_folder, "pdf") |
|
|
doc_folder = os.path.join(date_folder, "doc") |
|
|
csv_folder = os.path.join(date_folder, "csv") |
|
|
|
|
|
|
|
|
for folder in [pdf_folder, doc_folder, csv_folder]: |
|
|
if not os.path.exists(folder): |
|
|
os.makedirs(folder) |
|
|
|
|
|
return { |
|
|
'date_folder': date_folder, |
|
|
'pdf_folder': pdf_folder, |
|
|
'doc_folder': doc_folder, |
|
|
'csv_folder': csv_folder |
|
|
} |
|
|
|
|
|
def download_document(doc_url: str, folder_paths: dict, filename: str = None) -> tuple: |
|
|
""" |
|
|
Download document to specified folder and return local file path and document type |
|
|
Returns a tuple of (local_path, file_type) |
|
|
""" |
|
|
try: |
|
|
|
|
|
if not filename: |
|
|
parsed_url = urlparse(doc_url) |
|
|
filename = os.path.basename(parsed_url.path) |
|
|
if not filename or 'downloadfile' in filename: |
|
|
|
|
|
filename = f"document_{datetime.now().strftime('%Y%m%d_%H%M%S')}" |
|
|
|
|
|
|
|
|
file_type = "unknown" |
|
|
|
|
|
|
|
|
if (doc_url.lower().endswith('.pdf') or |
|
|
'pdf' in doc_url.lower() or |
|
|
|
|
|
'downloadfile' in doc_url.lower() or |
|
|
|
|
|
'MjAyNS' in doc_url): |
|
|
|
|
|
file_type = "pdf" |
|
|
target_folder = folder_paths['pdf_folder'] |
|
|
if not filename.endswith('.pdf'): |
|
|
filename += '.pdf' |
|
|
elif any(ext in doc_url.lower() for ext in ['.doc', '.docx', 'msword', 'officedocument']): |
|
|
file_type = "doc" |
|
|
target_folder = folder_paths['doc_folder'] |
|
|
if not any(filename.endswith(ext) for ext in ['.doc', '.docx']): |
|
|
filename += '.docx' |
|
|
elif '.csv' in doc_url.lower() or 'spreadsheet' in doc_url.lower(): |
|
|
file_type = "csv" |
|
|
target_folder = folder_paths['csv_folder'] |
|
|
if not filename.endswith('.csv'): |
|
|
filename += '.csv' |
|
|
else: |
|
|
|
|
|
file_type = "pdf" |
|
|
target_folder = folder_paths['pdf_folder'] |
|
|
filename += '.pdf' |
|
|
|
|
|
|
|
|
headers = { |
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", |
|
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", |
|
|
"Accept-Language": "en-US,en;q=0.5", |
|
|
"Connection": "keep-alive", |
|
|
"Referer": doc_url |
|
|
} |
|
|
|
|
|
|
|
|
response = requests.get(doc_url, headers=headers, timeout=30) |
|
|
response.raise_for_status() |
|
|
|
|
|
|
|
|
print(f"Downloaded document size: {len(response.content)} bytes") |
|
|
print(f"Content-Type header: {response.headers.get('Content-Type', 'None')}") |
|
|
|
|
|
|
|
|
content_type = response.headers.get('Content-Type', '').lower() |
|
|
|
|
|
|
|
|
if 'pdf' in content_type: |
|
|
file_type = "pdf" |
|
|
if not filename.endswith('.pdf'): |
|
|
filename = filename.rsplit('.', 1)[0] + '.pdf' |
|
|
elif any(doc_type in content_type for doc_type in ['word', 'msword', 'officedocument', 'doc']): |
|
|
file_type = "doc" |
|
|
if not any(filename.endswith(ext) for ext in ['.doc', '.docx']): |
|
|
filename = filename.rsplit('.', 1)[0] + '.docx' |
|
|
elif any(csv_type in content_type for csv_type in ['csv', 'spreadsheet', 'excel', 'text/plain']): |
|
|
file_type = "csv" |
|
|
if not filename.endswith('.csv'): |
|
|
filename = filename.rsplit('.', 1)[0] + '.csv' |
|
|
elif 'octet-stream' in content_type: |
|
|
|
|
|
try: |
|
|
|
|
|
if len(response.content) >= 5 and response.content[:5] == b'%PDF-': |
|
|
print("Detected PDF signature in content") |
|
|
file_type = "pdf" |
|
|
if not filename.endswith('.pdf'): |
|
|
filename = filename.rsplit('.', 1)[0] + '.pdf' |
|
|
|
|
|
elif len(response.content) > 100: |
|
|
sample = response.content[:1000].decode('utf-8', errors='ignore') |
|
|
if sample.count(',') > 5 and sample.count('\n') > 2: |
|
|
print("Content appears to be CSV based on commas and newlines") |
|
|
file_type = "csv" |
|
|
if not filename.endswith('.csv'): |
|
|
filename = filename.rsplit('.', 1)[0] + '.csv' |
|
|
except Exception as e: |
|
|
print(f"Error analyzing file content: {str(e)}") |
|
|
|
|
|
|
|
|
print(f"Final determined file type: {file_type}") |
|
|
|
|
|
|
|
|
if file_type == "pdf": |
|
|
target_folder = folder_paths['pdf_folder'] |
|
|
elif file_type == "doc": |
|
|
target_folder = folder_paths['doc_folder'] |
|
|
elif file_type == "csv": |
|
|
target_folder = folder_paths['csv_folder'] |
|
|
|
|
|
|
|
|
local_path = os.path.join(target_folder, filename) |
|
|
with open(local_path, 'wb') as f: |
|
|
f.write(response.content) |
|
|
|
|
|
print(f"Downloaded {file_type.upper()} file: {filename} ({len(response.content)} bytes)") |
|
|
|
|
|
return local_path, file_type |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error downloading document {doc_url}: {str(e)}") |
|
|
return None, None |
|
|
|
|
|
def extract_pdf_text_from_file(file_path: str) -> str: |
|
|
""" |
|
|
Extract text from local PDF file using multiple methods for better compatibility |
|
|
""" |
|
|
from document_scraper import extract_text_from_pdf_file |
|
|
return extract_text_from_pdf_file(file_path) |
|
|
|
|
|
def process_direct_document(url: str, source: str = None) -> list: |
|
|
""" |
|
|
Process a direct document URL without scraping the website |
|
|
This is useful for direct PDF links when you only want to download and extract text |
|
|
""" |
|
|
try: |
|
|
|
|
|
if source is None: |
|
|
if "reliefweb.int" in url: |
|
|
source = "ReliefWeb" |
|
|
elif "fscluster.org" in url: |
|
|
source = "FS Cluster" |
|
|
elif "mopnd.govsomaliland.org" in url: |
|
|
source = "MOPND Somaliland" |
|
|
elif "nbs.gov.so" in url: |
|
|
source = "NBS Somalia" |
|
|
elif "data.humdata.org" in url: |
|
|
source = "HDX Humanitarian Data Exchange" |
|
|
elif "logcluster.org" in url: |
|
|
source = "LogCluster" |
|
|
elif "fsnau.org" in url: |
|
|
source = "FSNau - Food Security and Nutrition Analysis Unit" |
|
|
elif "fews.net" in url: |
|
|
source = "FEWS NET" |
|
|
elif "icpac.net" in url: |
|
|
source = "ICPAC" |
|
|
elif "faoswalim.org" in url: |
|
|
source = "FAO SWALIM" |
|
|
else: |
|
|
source = "Unknown" |
|
|
|
|
|
|
|
|
folder_paths = create_archive_folders(source) |
|
|
|
|
|
|
|
|
url_lower = url.lower() |
|
|
if url_lower.endswith('.pdf'): |
|
|
file_type = "pdf" |
|
|
elif url_lower.endswith('.doc') or url_lower.endswith('.docx'): |
|
|
file_type = "doc" |
|
|
elif url_lower.endswith('.csv'): |
|
|
file_type = "csv" |
|
|
else: |
|
|
|
|
|
if 'pdf' in url_lower or 'document' in url_lower or 'report' in url_lower: |
|
|
file_type = "pdf" |
|
|
elif 'csv' in url_lower or 'data' in url_lower or 'dataset' in url_lower or 'export' in url_lower: |
|
|
file_type = "csv" |
|
|
elif 'doc' in url_lower: |
|
|
file_type = "doc" |
|
|
else: |
|
|
file_type = "pdf" |
|
|
|
|
|
print(f"Detected file type from URL: {file_type}") |
|
|
|
|
|
|
|
|
filename = f"document_{datetime.now().strftime('%Y%m%d_%H%M%S')}" |
|
|
|
|
|
|
|
|
local_path, detected_type = download_document(url, folder_paths, filename) |
|
|
|
|
|
if not local_path: |
|
|
return [{ |
|
|
"title": "Download Error", |
|
|
"date": datetime.now().strftime("%Y-%m-%d"), |
|
|
"source": source, |
|
|
"file_path": url, |
|
|
"extracted_text": f"Failed to download document: {url}", |
|
|
"file_type": "Error" |
|
|
}] |
|
|
|
|
|
|
|
|
file_type = detected_type.upper() if detected_type else "UNKNOWN" |
|
|
if file_type == "PDF": |
|
|
extracted_text = extract_pdf_text_from_file(local_path) |
|
|
elif file_type == "DOC": |
|
|
extracted_text = f"Text from DOC file: {os.path.basename(local_path)}" |
|
|
elif file_type == "CSV": |
|
|
extracted_text = f"Data from CSV file: {os.path.basename(local_path)}" |
|
|
else: |
|
|
extracted_text = f"Content from {file_type} file: {os.path.basename(local_path)}" |
|
|
|
|
|
|
|
|
title = os.path.basename(url) |
|
|
|
|
|
return [{ |
|
|
"title": title, |
|
|
"date": datetime.now().strftime("%Y-%m-%d"), |
|
|
"source": source, |
|
|
"file_path": local_path, |
|
|
"extracted_text": extracted_text, |
|
|
"file_type": file_type |
|
|
}] |
|
|
|
|
|
except Exception as e: |
|
|
return [{ |
|
|
"title": f"Error processing document: {str(e)}", |
|
|
"date": datetime.now().strftime("%Y-%m-%d"), |
|
|
"source": "Error", |
|
|
"file_path": url, |
|
|
"extracted_text": f"Failed to process document URL: {url}", |
|
|
"file_type": "Error" |
|
|
}] |
|
|
|
|
|
async def process_documents_from_url(url: str, extract_website_content: bool = True) -> list: |
|
|
""" |
|
|
Process documents from URL using the unified scraper with local PDF downloads |
|
|
|
|
|
Parameters: |
|
|
- url: The URL to process |
|
|
- extract_website_content: If False, only download and extract PDFs without scraping website content |
|
|
|
|
|
Returns: |
|
|
- A list of document dictionaries |
|
|
""" |
|
|
try: |
|
|
|
|
|
if not extract_website_content: |
|
|
|
|
|
if (url.lower().endswith('.pdf') or |
|
|
url.lower().endswith('.doc') or |
|
|
url.lower().endswith('.docx') or |
|
|
url.lower().endswith('.csv')): |
|
|
print(f"Processing direct document URL with extension: {url}") |
|
|
return process_direct_document(url) |
|
|
|
|
|
|
|
|
|
|
|
doc_indicators = [ |
|
|
'download', 'file', 'document', 'attachment', 'pdf', 'doc', 'csv', |
|
|
'report', 'publication', 'data', 'dataset', 'export' |
|
|
] |
|
|
|
|
|
|
|
|
if any(indicator in url.lower() for indicator in doc_indicators): |
|
|
print(f"URL appears to be a document without extension: {url}") |
|
|
print("Attempting direct document processing...") |
|
|
return process_direct_document(url) |
|
|
|
|
|
|
|
|
if "reliefweb.int" in url: |
|
|
website_name = "reliefweb" |
|
|
source = "ReliefWeb" |
|
|
elif "fscluster.org" in url: |
|
|
website_name = "fscluster" |
|
|
source = "FS Cluster" |
|
|
elif "mopnd.govsomaliland.org" in url: |
|
|
website_name = "mopnd" |
|
|
source = "MOPND Somaliland" |
|
|
elif "nbs.gov.so" in url: |
|
|
website_name = "nbs" |
|
|
source = "NBS Somalia" |
|
|
elif "data.humdata.org" in url: |
|
|
website_name = "hdx" |
|
|
source = "HDX Humanitarian Data Exchange" |
|
|
elif "logcluster.org" in url: |
|
|
website_name = "logcluster" |
|
|
source = "LogCluster" |
|
|
elif "fsnau.org" in url: |
|
|
if "fsnau.org/publications" in url: |
|
|
website_name = "fsnau_publications" |
|
|
source = "FSNau Publications" |
|
|
else: |
|
|
website_name = "fsnau" |
|
|
source = "FSNau - Food Security and Nutrition Analysis Unit" |
|
|
elif "fews.net" in url: |
|
|
website_name = "fews" |
|
|
source = "FEWS NET - Famine Early Warning Systems Network" |
|
|
elif "icpac.net" in url: |
|
|
if "seasonal-forecast" in url.lower(): |
|
|
website_name = "icpac_seasonal_forecast" |
|
|
source = "ICPAC - IGAD Climate Prediction and Applications Centre - Seasonal Forecast" |
|
|
else: |
|
|
website_name = "icpac" |
|
|
source = "ICPAC - IGAD Climate Prediction and Applications Centre" |
|
|
elif "frrims.faoswalim.org" in url: |
|
|
website_name = "faoswalim_frrims_river_levels" |
|
|
source = "FAO SWALIM FRRIMS River Levels" |
|
|
elif "faoswalim.org" in url: |
|
|
if "water/water-publications" in url or "water-publications" in url: |
|
|
website_name = "faoswalim_water_publications" |
|
|
source = "FAO SWALIM Water Publications" |
|
|
elif "flood-watch-bulletin" in url or "ag-document-type/flood-watch-bulletin" in url: |
|
|
website_name = "faoswalim_flood_watch" |
|
|
source = "FAO SWALIM Flood Watch" |
|
|
elif "faoswalim.org/swalim-events" in url: |
|
|
website_name = "faoswalim_events" |
|
|
source = "FAO SWALIM Events" |
|
|
elif "faoswalim.org/swalim-journals" in url: |
|
|
website_name = "faoswalim_journals" |
|
|
source = "FAO SWALIM Journals" |
|
|
elif "faoswalim.org/swalim-publications" in url: |
|
|
website_name = "faoswalim_publications" |
|
|
source = "FAO SWALIM Publications" |
|
|
elif "faoswalim.org/swalim-articles" in url: |
|
|
website_name = "faoswalim_articles" |
|
|
source = "FAO SWALIM Articles" |
|
|
else: |
|
|
website_name = "faoswalim" |
|
|
source = "FAO SWALIM - Somalia Water and Land Information Management" |
|
|
elif "drought.emergency.copernicus.eu" in url: |
|
|
website_name = "copernicus_drought" |
|
|
source = "Copernicus Drought Observatory" |
|
|
else: |
|
|
website_name = "unknown" |
|
|
source = "Unknown" |
|
|
|
|
|
|
|
|
folder_paths = create_archive_folders(source) |
|
|
|
|
|
|
|
|
if extract_website_content: |
|
|
|
|
|
print("Scraping website content...") |
|
|
articles = await scrape_news_async(url, website_name, force_mode="document") |
|
|
else: |
|
|
|
|
|
|
|
|
pdf_websites = get_pdf_websites() |
|
|
if website_name in pdf_websites: |
|
|
print(f"Directly downloading PDFs from {website_name} page without extracting website content...") |
|
|
|
|
|
|
|
|
from document_scraper import download_and_save_pdf |
|
|
|
|
|
|
|
|
return [{ |
|
|
"title": f"PDF-Only Mode for {source}", |
|
|
"date": datetime.now().strftime("%Y-%m-%d"), |
|
|
"source": source, |
|
|
"file_path": url, |
|
|
"extracted_text": f"PDF-only mode requested. Please use the direct document URL to download specific PDFs.", |
|
|
"file_type": "Info" |
|
|
}] |
|
|
else: |
|
|
|
|
|
print("PDF-only mode requested but this site isn't configured for direct PDF downloads.") |
|
|
print("Falling back to normal website scraping...") |
|
|
articles = await scrape_news_async(url, website_name, force_mode="document") |
|
|
|
|
|
|
|
|
documents = [] |
|
|
for i, article in enumerate(articles): |
|
|
|
|
|
doc_path = article.get("pdf_path", "") or article.get("local_path", "") |
|
|
local_doc_path = article.get("local_file_path", "") or article.get("local_path", "") |
|
|
|
|
|
|
|
|
if not local_doc_path and doc_path: |
|
|
local_doc_path = doc_path |
|
|
|
|
|
|
|
|
print(f"Processing article {i+1}:") |
|
|
print(f" Original doc_path: {doc_path}") |
|
|
print(f" Local path: {local_doc_path}") |
|
|
|
|
|
extracted_text = article.get("content", "") or article.get("extracted_text", "No content") |
|
|
file_type = article.get("file_type", "Web Content") |
|
|
|
|
|
|
|
|
if doc_path: |
|
|
try: |
|
|
|
|
|
if doc_path.startswith("archive/") or doc_path.startswith("/") or os.path.exists(doc_path): |
|
|
print(f"Using already archived file: {doc_path}") |
|
|
local_doc_path = doc_path |
|
|
|
|
|
|
|
|
if doc_path.lower().endswith(".pdf"): |
|
|
file_type = "PDF" |
|
|
extracted_text = article.get("content", "") or article.get("extracted_text", "No content") |
|
|
elif doc_path.lower().endswith((".doc", ".docx")): |
|
|
file_type = "DOC" |
|
|
|
|
|
if not extracted_text or extracted_text == "No content": |
|
|
extracted_text = f"Text from DOC file: {os.path.basename(doc_path)}" |
|
|
elif doc_path.lower().endswith(".csv"): |
|
|
file_type = "CSV" |
|
|
|
|
|
if not extracted_text or extracted_text == "No content": |
|
|
extracted_text = f"Data from CSV file: {os.path.basename(doc_path)}" |
|
|
else: |
|
|
file_type = "PDF" |
|
|
else: |
|
|
|
|
|
filename = f"document_{i+1}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" |
|
|
local_doc_path, detected_type = download_document(doc_path, folder_paths, filename) |
|
|
|
|
|
if local_doc_path: |
|
|
|
|
|
file_type = detected_type.upper() if detected_type else "PDF" |
|
|
|
|
|
|
|
|
if file_type == "PDF": |
|
|
extracted_text = extract_pdf_text_from_file(local_doc_path) |
|
|
elif file_type == "DOC": |
|
|
|
|
|
extracted_text = f"Text from DOC file: {os.path.basename(local_doc_path)}" |
|
|
elif file_type == "CSV": |
|
|
|
|
|
extracted_text = f"Data from CSV file: {os.path.basename(local_doc_path)}" |
|
|
else: |
|
|
|
|
|
extracted_text = f"Content from {file_type} file: {os.path.basename(local_doc_path)}" |
|
|
else: |
|
|
|
|
|
file_type = "Web Content" |
|
|
local_doc_path = doc_path |
|
|
except Exception as e: |
|
|
print(f"Error processing document for article {i+1}: {str(e)}") |
|
|
file_type = "Web Content" |
|
|
local_doc_path = doc_path |
|
|
else: |
|
|
file_type = "Web Content" |
|
|
|
|
|
|
|
|
if file_type == "CSV": |
|
|
|
|
|
|
|
|
if not extracted_text or extracted_text == "No content": |
|
|
csv_file_name = os.path.basename(local_doc_path) if local_doc_path else article.get("title", "CSV File") |
|
|
extracted_text = f"CSV File: {csv_file_name}\nFile Path: {local_doc_path or 'Not available'}\n(CSV file downloaded successfully)" |
|
|
|
|
|
|
|
|
if not local_doc_path: |
|
|
local_doc_path = article.get("local_path", "") or article.get("pdf_path", "") |
|
|
|
|
|
|
|
|
document = { |
|
|
"title": article.get("title", "No title"), |
|
|
"date": article.get("date", datetime.now().strftime("%Y-%m-%d")), |
|
|
"source": source, |
|
|
"file_path": local_doc_path if local_doc_path else article.get("pdf_path", "") or article.get("local_path", ""), |
|
|
"extracted_text": extracted_text, |
|
|
"file_type": file_type |
|
|
} |
|
|
|
|
|
|
|
|
if file_type == "CSV" and not document["file_path"]: |
|
|
|
|
|
document["file_path"] = article.get("url", "") |
|
|
print(f"⚠️ CSV file path not found, using URL: {document['file_path']}") |
|
|
|
|
|
|
|
|
if document["file_path"] and not document["file_path"].startswith(("http://", "https://")) and "pdf" in document["file_type"].lower(): |
|
|
|
|
|
document["file_type"] = "PDF" |
|
|
print(f"Confirmed PDF document with local path: {document['file_path']}") |
|
|
|
|
|
|
|
|
if file_type == "CSV": |
|
|
print(f"✅ CSV file will be included: {document['title']} at {document['file_path']}") |
|
|
|
|
|
|
|
|
print(f"Document {i+1}:") |
|
|
print(f" Title: {document['title']}") |
|
|
print(f" File Path: {document['file_path']}") |
|
|
print(f" File Type: {document['file_type']}") |
|
|
print(f" Text Length: {len(document['extracted_text'])} chars") |
|
|
documents.append(document) |
|
|
|
|
|
return documents |
|
|
|
|
|
except Exception as e: |
|
|
return [{ |
|
|
"title": f"Error processing documents: {str(e)}", |
|
|
"date": datetime.now().strftime("%Y-%m-%d"), |
|
|
"source": "Error", |
|
|
"file_path": "", |
|
|
"extracted_text": f"Failed to process URL: {url}", |
|
|
"file_type": "Error" |
|
|
}] |
|
|
|