|
|
""" |
|
|
Text Scraper - Handles article and text content processing |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
import logging |
|
|
import re |
|
|
from datetime import datetime |
|
|
from typing import List, Dict, Any |
|
|
import time |
|
|
|
|
|
from scraper_common import ( |
|
|
WEBSITE_CONFIG, MAX_ARTICLE_LIMIT, MAX_PAGE_LIMIT, |
|
|
convert_to_absolute_url, scraping_cancelled |
|
|
) |
|
|
|
|
|
|
|
|
from keyword_filter import get_category_for_text |
|
|
|
|
|
|
|
|
from date_filter import is_date_in_range, standardize_date |
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
|
level=logging.INFO, |
|
|
format='%(asctime)s - %(name)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s' |
|
|
) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
def construct_navigation_url(base_url: str, nav_addition: str) -> str: |
|
|
""" |
|
|
Construct navigation URL by properly handling trailing slashes and query parameters |
|
|
""" |
|
|
|
|
|
if base_url.endswith('/'): |
|
|
base_url = base_url.rstrip('/') |
|
|
|
|
|
|
|
|
if nav_addition.startswith('/'): |
|
|
|
|
|
return base_url + nav_addition |
|
|
elif nav_addition.startswith('?'): |
|
|
|
|
|
return base_url + nav_addition |
|
|
else: |
|
|
|
|
|
return base_url + '/' + nav_addition |
|
|
|
|
|
|
|
|
mopnd_article_dates = {} |
|
|
|
|
|
async def get_article_links_with_dates_from_page(page, config: dict, website_type: str) -> List[str]: |
|
|
""" |
|
|
Get article links with dates from a single page (for MOPND) |
|
|
""" |
|
|
try: |
|
|
logger.info(f"π Extracting article links with dates from page for {website_type}") |
|
|
|
|
|
|
|
|
article_selector = config.get("article_links") or config.get("page_links") |
|
|
if not article_selector: |
|
|
logger.warning("β οΈ No article_links or page_links selector found in config") |
|
|
return [] |
|
|
|
|
|
|
|
|
date_selector = config.get("date") |
|
|
if not date_selector: |
|
|
logger.warning("β οΈ No date selector found in config") |
|
|
return [] |
|
|
|
|
|
|
|
|
link_elements = await page.query_selector_all(article_selector) |
|
|
logger.info(f"π° Found {len(link_elements)} article link elements") |
|
|
|
|
|
|
|
|
date_elements = await page.query_selector_all(date_selector) |
|
|
logger.info(f"π
Found {len(date_elements)} date elements") |
|
|
|
|
|
|
|
|
article_links = [] |
|
|
for i, link_element in enumerate(link_elements): |
|
|
try: |
|
|
|
|
|
href = await link_element.get_attribute("href") |
|
|
if href: |
|
|
|
|
|
absolute_url = convert_to_absolute_url(href, page.url) |
|
|
article_links.append(absolute_url) |
|
|
|
|
|
|
|
|
if i < len(date_elements): |
|
|
try: |
|
|
date_text = await date_elements[i].text_content() |
|
|
if date_text and date_text.strip(): |
|
|
|
|
|
mopnd_article_dates[absolute_url] = date_text.strip() |
|
|
logger.debug(f"β
Stored date for {absolute_url}: {date_text.strip()}") |
|
|
except Exception as e: |
|
|
logger.debug(f"β οΈ Could not extract date for link {i}: {str(e)}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"β Error extracting link {i}: {str(e)}") |
|
|
continue |
|
|
|
|
|
logger.info(f"π Extracted {len(article_links)} article links with dates") |
|
|
return article_links |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"β Error extracting article links with dates: {str(e)}") |
|
|
return [] |
|
|
|
|
|
async def get_all_article_links_unified(page, url: str, config: dict, website_type: str = None) -> List[str]: |
|
|
""" |
|
|
Function to get article links from multiple pages with pagination support |
|
|
Stops when no new (non-repeating) articles are found |
|
|
""" |
|
|
try: |
|
|
logger.info(f"π Getting article links from: {url}") |
|
|
logger.info(f"π Website type: {website_type}") |
|
|
|
|
|
|
|
|
navigation_selector = config.get("navigation_selector") |
|
|
navigation_url_addition = config.get("navigation_url_addition") |
|
|
start_page = config.get("start_page", 1) |
|
|
|
|
|
all_article_links = [] |
|
|
seen_links = set() |
|
|
current_page = start_page |
|
|
consecutive_empty_pages = 0 |
|
|
max_consecutive_empty = 2 |
|
|
|
|
|
|
|
|
await page.goto(url, wait_until="domcontentloaded", timeout=30000) |
|
|
|
|
|
|
|
|
if navigation_selector and navigation_url_addition: |
|
|
logger.info(f"π§ Navigation configured: selector={navigation_selector}, url_addition={navigation_url_addition}") |
|
|
logger.info(f"π Starting from page: {start_page}") |
|
|
|
|
|
while True: |
|
|
logger.info(f"π Processing page {current_page}") |
|
|
|
|
|
|
|
|
if MAX_PAGE_LIMIT is not None and current_page > MAX_PAGE_LIMIT: |
|
|
logger.info(f"π Reached MAX_PAGE_LIMIT ({MAX_PAGE_LIMIT}), stopping pagination") |
|
|
break |
|
|
|
|
|
|
|
|
if current_page > start_page: |
|
|
nav_url_addition = navigation_url_addition.replace("{page_no}", str(current_page)) |
|
|
nav_url = construct_navigation_url(url, nav_url_addition) |
|
|
logger.info(f"π§ Navigating to: {nav_url}") |
|
|
await page.goto(nav_url, wait_until="domcontentloaded", timeout=30000) |
|
|
|
|
|
|
|
|
nav_element = await page.query_selector(navigation_selector) |
|
|
if current_page == start_page and nav_element: |
|
|
logger.info("β
Navigation element found, more pages available") |
|
|
elif current_page > start_page and not nav_element: |
|
|
logger.info("π No more navigation elements found, stopping pagination") |
|
|
break |
|
|
|
|
|
|
|
|
page_links = await extract_links_from_current_page(page, config, website_type) |
|
|
|
|
|
if page_links: |
|
|
|
|
|
new_links = [] |
|
|
for link in page_links: |
|
|
if link not in seen_links: |
|
|
seen_links.add(link) |
|
|
new_links.append(link) |
|
|
|
|
|
if new_links: |
|
|
all_article_links.extend(new_links) |
|
|
consecutive_empty_pages = 0 |
|
|
logger.info(f"π° Found {len(new_links)} new links on page {current_page} (total: {len(page_links)} links on page)") |
|
|
else: |
|
|
consecutive_empty_pages += 1 |
|
|
logger.info(f"π° No new links found on page {current_page} (all {len(page_links)} links were duplicates)") |
|
|
|
|
|
|
|
|
if consecutive_empty_pages >= max_consecutive_empty: |
|
|
logger.info(f"π Stopping pagination: {consecutive_empty_pages} consecutive pages with no new content") |
|
|
break |
|
|
else: |
|
|
consecutive_empty_pages += 1 |
|
|
logger.info(f"π° No links found on page {current_page}") |
|
|
|
|
|
|
|
|
if consecutive_empty_pages >= max_consecutive_empty: |
|
|
logger.info(f"π Stopping pagination: {consecutive_empty_pages} consecutive pages with no content") |
|
|
break |
|
|
|
|
|
current_page += 1 |
|
|
|
|
|
else: |
|
|
|
|
|
logger.info("π No navigation configured - scraping single page only") |
|
|
page_links = await extract_links_from_current_page(page, config, website_type) |
|
|
all_article_links.extend(page_links) |
|
|
|
|
|
logger.info(f"π Total unique article links found across all pages: {len(all_article_links)}") |
|
|
return all_article_links |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"β Error getting article links: {str(e)}") |
|
|
return [] |
|
|
|
|
|
|
|
|
async def extract_links_from_current_page(page, config: dict, website_type: str) -> List[str]: |
|
|
""" |
|
|
Extract article links from the current page |
|
|
""" |
|
|
try: |
|
|
|
|
|
if website_type == "mopnd": |
|
|
return await get_article_links_with_dates_from_page(page, config, website_type) |
|
|
else: |
|
|
|
|
|
article_selector = config.get("article_links") or config.get("page_links") |
|
|
if not article_selector: |
|
|
logger.warning("β οΈ No article_links or page_links selector found in config") |
|
|
return [] |
|
|
|
|
|
|
|
|
if isinstance(article_selector, list): |
|
|
|
|
|
article_selector = article_selector[0] |
|
|
logger.info(f"π Using first selector from list: {article_selector}") |
|
|
elif not isinstance(article_selector, str): |
|
|
logger.error(f"β Invalid selector type: {type(article_selector)}. Expected string or list.") |
|
|
return [] |
|
|
|
|
|
|
|
|
link_elements = await page.query_selector_all(article_selector) |
|
|
logger.info(f"π° Found {len(link_elements)} article link elements on current page") |
|
|
|
|
|
|
|
|
page_links = [] |
|
|
for i, link_element in enumerate(link_elements): |
|
|
try: |
|
|
|
|
|
href = await link_element.get_attribute("href") |
|
|
|
|
|
|
|
|
if not href: |
|
|
parent_link = await link_element.query_selector("a") |
|
|
if parent_link: |
|
|
href = await parent_link.get_attribute("href") |
|
|
|
|
|
|
|
|
if not href: |
|
|
try: |
|
|
|
|
|
parent_link = await link_element.evaluate(""" |
|
|
(element) => { |
|
|
let current = element; |
|
|
for (let i = 0; i < 5; i++) { |
|
|
if (current.tagName === 'A' && current.href) { |
|
|
return current.href; |
|
|
} |
|
|
current = current.parentElement; |
|
|
if (!current) break; |
|
|
} |
|
|
return null; |
|
|
} |
|
|
""") |
|
|
if parent_link: |
|
|
href = parent_link |
|
|
except Exception as e: |
|
|
logger.debug(f"Could not find parent link: {e}") |
|
|
|
|
|
if href: |
|
|
absolute_url = convert_to_absolute_url(href, page.url) |
|
|
page_links.append(absolute_url) |
|
|
else: |
|
|
logger.warning(f"β οΈ No href found for element {i}") |
|
|
except Exception as e: |
|
|
logger.warning(f"β Error extracting link {i}: {str(e)}") |
|
|
continue |
|
|
|
|
|
return page_links |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"β Error extracting links from current page: {str(e)}") |
|
|
return [] |
|
|
|
|
|
|
|
|
async def extract_all_articles_unified(page, article_links: List[str], config: dict, website_type: str = None, custom_keywords: str = "", start_date: str = None, end_date: str = None) -> List[dict]: |
|
|
""" |
|
|
Unified function to extract content from all articles |
|
|
Limited by MAX_ARTICLE_LIMIT if set |
|
|
""" |
|
|
logger.info(f"π Starting article extraction for {len(article_links)} articles") |
|
|
logger.debug(f"π§ Website type: {website_type}, Article limit: {MAX_ARTICLE_LIMIT}") |
|
|
|
|
|
all_articles = [] |
|
|
|
|
|
|
|
|
if MAX_ARTICLE_LIMIT is not None: |
|
|
if len(article_links) > MAX_ARTICLE_LIMIT: |
|
|
logger.info(f"π Limiting to first {MAX_ARTICLE_LIMIT} articles out of {len(article_links)} total") |
|
|
article_links = article_links[:MAX_ARTICLE_LIMIT] |
|
|
|
|
|
logger.info(f"π― Processing {len(article_links)} articles") |
|
|
|
|
|
for i, link in enumerate(article_links): |
|
|
if scraping_cancelled(): |
|
|
logger.info("π Scraping cancelled, stopping article extraction") |
|
|
break |
|
|
|
|
|
logger.info(f"π° Processing article {i+1}/{len(article_links)}: {link}") |
|
|
|
|
|
try: |
|
|
|
|
|
import asyncio |
|
|
|
|
|
|
|
|
try: |
|
|
article_data = await asyncio.wait_for( |
|
|
extract_article_content_unified(page, link, config, website_type, custom_keywords, start_date, end_date), |
|
|
timeout=60 |
|
|
) |
|
|
if article_data is not None: |
|
|
all_articles.append(article_data) |
|
|
else: |
|
|
logger.info(f"π Skipped article {i+1} (no content, no keyword match, or date out of range): {link}") |
|
|
except asyncio.TimeoutError: |
|
|
logger.warning(f"First attempt timeout for article {i+1}, retrying with shorter timeout...") |
|
|
|
|
|
try: |
|
|
article_data = await asyncio.wait_for( |
|
|
extract_article_content_unified(page, link, config, website_type, custom_keywords, start_date, end_date), |
|
|
timeout=30 |
|
|
) |
|
|
if article_data is not None: |
|
|
all_articles.append(article_data) |
|
|
else: |
|
|
logger.info(f"π Skipped article {i+1} (no content, no keyword match, or date out of range): {link}") |
|
|
except asyncio.TimeoutError: |
|
|
logger.error(f"Timeout extracting article {i+1} after retry: {link}") |
|
|
all_articles.append({ |
|
|
"title": f"Timeout extracting article {i+1}", |
|
|
"content": f"Article extraction timed out after multiple attempts: {link}", |
|
|
"date": datetime.now().strftime("%Y-%m-%d"), |
|
|
"url": link |
|
|
}) |
|
|
except Exception as e: |
|
|
logger.error(f"Error extracting article {i+1}: {str(e)}") |
|
|
all_articles.append({ |
|
|
"title": f"Error extracting article {i+1}", |
|
|
"content": f"Error extracting article: {str(e)}", |
|
|
"date": datetime.now().strftime("%Y-%m-%d"), |
|
|
"url": link |
|
|
}) |
|
|
except Exception as e: |
|
|
logger.error(f"Unexpected error processing article {i+1}: {str(e)}") |
|
|
all_articles.append({ |
|
|
"title": f"Error processing article {i+1}", |
|
|
"content": f"Unexpected error: {str(e)}", |
|
|
"date": datetime.now().strftime("%Y-%m-%d"), |
|
|
"url": link |
|
|
}) |
|
|
|
|
|
return all_articles |
|
|
|
|
|
async def extract_article_content_unified(page, article_url: str, config: dict, website_type: str = None, custom_keywords: str = "", start_date: str = None, end_date: str = None) -> dict: |
|
|
""" |
|
|
Unified function to extract content from a single article (text-focused) |
|
|
With 5 retry attempts for loading articles |
|
|
""" |
|
|
try: |
|
|
max_retries = 5 |
|
|
retry_count = 0 |
|
|
|
|
|
while retry_count < max_retries: |
|
|
try: |
|
|
retry_count += 1 |
|
|
logger.info(f"π Loading article (attempt {retry_count}/{max_retries}): {article_url}") |
|
|
|
|
|
|
|
|
if retry_count == 1: |
|
|
|
|
|
await page.goto(article_url, wait_until="domcontentloaded", timeout=30000) |
|
|
elif retry_count == 2: |
|
|
|
|
|
await page.goto(article_url, timeout=20000) |
|
|
elif retry_count == 3: |
|
|
|
|
|
await page.goto(article_url, wait_until="networkidle", timeout=15000) |
|
|
else: |
|
|
|
|
|
await page.goto(article_url, timeout=10000) |
|
|
|
|
|
logger.info(f"β
Successfully loaded article on attempt {retry_count}") |
|
|
break |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"β οΈ Attempt {retry_count} failed for {article_url}: {str(e)}") |
|
|
|
|
|
if retry_count >= max_retries: |
|
|
logger.error(f"β Failed to load article after {max_retries} attempts: {article_url}") |
|
|
return { |
|
|
"title": "Network Error", |
|
|
"content": f"Failed to access article after {max_retries} attempts: {str(e)}", |
|
|
"date": datetime.now().strftime("%Y-%m-%d"), |
|
|
"url": article_url |
|
|
} |
|
|
|
|
|
|
|
|
import asyncio |
|
|
await asyncio.sleep(2) |
|
|
|
|
|
|
|
|
title = "" |
|
|
try: |
|
|
title_element = await page.query_selector(config.get("title")) |
|
|
if title_element: |
|
|
title = await title_element.text_content() |
|
|
if title: |
|
|
title = title.strip() |
|
|
except Exception as e: |
|
|
logger.warning(f"Error extracting title: {str(e)}") |
|
|
title = "" |
|
|
|
|
|
|
|
|
if website_type is None: |
|
|
for site_type, site_config in WEBSITE_CONFIG.items(): |
|
|
if site_config == config: |
|
|
website_type = site_type |
|
|
break |
|
|
if website_type is None: |
|
|
website_type = "unknown" |
|
|
|
|
|
content = "" |
|
|
|
|
|
|
|
|
if website_type == "hiiraan": |
|
|
|
|
|
content_selector = config.get("content") |
|
|
try: |
|
|
|
|
|
content_element = await page.query_selector(content_selector) |
|
|
if content_element: |
|
|
|
|
|
html_content = await content_element.inner_html() |
|
|
|
|
|
|
|
|
html_content = re.sub(r'<script.*?</script>', '', html_content, flags=re.DOTALL) |
|
|
|
|
|
|
|
|
html_content = re.sub(r'<div class="inline-ad">.*?</div>', '', html_content, flags=re.DOTALL) |
|
|
|
|
|
|
|
|
content = re.sub(r'<.*?>', ' ', html_content) |
|
|
content = re.sub(r'\s+', ' ', content).strip() |
|
|
except Exception as e: |
|
|
logger.warning(f"Error extracting hiiraan content: {str(e)}") |
|
|
content = "" |
|
|
else: |
|
|
|
|
|
content_selector = config.get("content") |
|
|
content = "" |
|
|
try: |
|
|
content_elements = await page.query_selector_all(content_selector) |
|
|
content_parts = [] |
|
|
for element in content_elements: |
|
|
text = await element.text_content() |
|
|
if text: |
|
|
content_parts.append(text.strip()) |
|
|
content = "\n\n".join(content_parts) |
|
|
except Exception as e: |
|
|
logger.warning(f"Error extracting content: {str(e)}") |
|
|
content = "" |
|
|
|
|
|
|
|
|
date_raw = "" |
|
|
|
|
|
|
|
|
if website_type == "mopnd" and article_url in mopnd_article_dates: |
|
|
date_raw = mopnd_article_dates[article_url] |
|
|
logger.debug(f"β
Using MOPND date from main page: {date_raw}") |
|
|
else: |
|
|
|
|
|
date_selector = config.get("date") |
|
|
|
|
|
if date_selector: |
|
|
try: |
|
|
date_element = await page.query_selector(date_selector) |
|
|
if date_element: |
|
|
date_raw = await date_element.text_content() |
|
|
if date_raw: |
|
|
date_raw = date_raw.strip() |
|
|
logger.debug(f"β
Extracted raw date: {date_raw}") |
|
|
except Exception as e: |
|
|
logger.warning(f"Error extracting date with selector {date_selector}: {str(e)}") |
|
|
|
|
|
|
|
|
date = standardize_date(date_raw, default_to_current=True) |
|
|
if not date: |
|
|
date = datetime.now().strftime("%Y-%m-%d") |
|
|
logger.info(f"No date found with config selector, using current date: {date}") |
|
|
|
|
|
|
|
|
from date_filter import parse_date_input |
|
|
start_dt = parse_date_input(start_date) if start_date else None |
|
|
end_dt = parse_date_input(end_date) if end_date else None |
|
|
|
|
|
if start_dt is not None or end_dt is not None: |
|
|
if not is_date_in_range(date, start_dt, end_dt, include_missing=False): |
|
|
logger.info(f"π
Article date {date} is outside date range [{start_date}, {end_date}] - filtering out") |
|
|
return None |
|
|
|
|
|
|
|
|
combined_text = f"{title} {content}".strip() |
|
|
category = get_category_for_text(combined_text, custom_keywords) |
|
|
|
|
|
if category is None: |
|
|
logger.info("π Article did not match any keyword categories - filtering out") |
|
|
return None |
|
|
elif category: |
|
|
logger.info(f"π Article categorized as: {category}") |
|
|
else: |
|
|
logger.info("π Article kept with empty category") |
|
|
|
|
|
result = { |
|
|
"title": title or "No title found", |
|
|
"content": content or "No content found", |
|
|
"date": date, |
|
|
"url": article_url, |
|
|
"category": category |
|
|
} |
|
|
|
|
|
logger.info(f"π Article result: title='{result['title'][:50]}...', category='{category}'") |
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error extracting content from {article_url}: {str(e)}") |
|
|
return { |
|
|
"title": "Error", |
|
|
"content": f"Error extracting content: {str(e)}", |
|
|
"date": datetime.now().strftime("%Y-%m-%d"), |
|
|
"url": article_url |
|
|
} |
|
|
|