Hoghoghi / app /legal_scraper_interface.py
Really-amin's picture
Upload 2 files
91a893c verified
import requests
import time
import json
import csv
import sqlite3
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Union
from dataclasses import dataclass, asdict
from pathlib import Path
import re
import pandas as pd
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from bs4 import BeautifulSoup
try:
import torch
from transformers import AutoTokenizer, AutoModel
TORCH_AVAILABLE = True
except ImportError:
TORCH_AVAILABLE = False
print("⚠️ PyTorch not available, running without advanced NLP features")
try:
import hazm
from hazm import Normalizer, word_tokenize, sent_tokenize
HAZM_AVAILABLE = True
except ImportError:
HAZM_AVAILABLE = False
print("⚠️ Hazm not available, using basic text processing")
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('legal_scraper.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Predefined Iranian legal and news sources
IRANIAN_LEGAL_SOURCES = [
"https://www.irna.ir", # خبرگزاری جمهوری اسلامی
"https://www.tasnimnews.com", # خبرگزاری تسنیم
"https://www.mehrnews.com", # خبرگزاری مهر
"https://www.farsnews.ir", # خبرگزاری فارس
"https://iribnews.ir", # خبرگزاری صدا و سیما
"https://www.dolat.ir", # پورتال دولت
"https://rc.majlis.ir", # مرکز پژوهش‌های مجلس
]
@dataclass
class LegalDocument:
"""Enhanced legal document with NLP features"""
title: str
content: str
source_url: str
document_type: str
date_published: Optional[str] = None
date_scraped: str = None
category: Optional[str] = None
tags: List[str] = None
summary: Optional[str] = None
importance_score: float = 0.0
sentiment_score: float = 0.0
legal_entities: List[str] = None
keywords: List[str] = None
embedding: List[float] = None
language: str = "fa"
def __post_init__(self):
if self.date_scraped is None:
self.date_scraped = datetime.now().isoformat()
if self.tags is None:
self.tags = []
if self.legal_entities is None:
self.legal_entities = []
if self.keywords is None:
self.keywords = []
class PersianNLPProcessor:
"""Persian NLP processor using available models"""
def __init__(self):
if HAZM_AVAILABLE:
self.normalizer = Normalizer()
else:
self.normalizer = None
self.device = torch.device('cpu')
self.tokenizer = None
self.model = None
if TORCH_AVAILABLE:
try:
model_names = [
"HooshvareLab/bert-fa-base-uncased",
"HooshvareLab/bert-base-parsbert-uncased",
"distilbert-base-multilingual-cased"
]
for model_name in model_names:
try:
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModel.from_pretrained(model_name)
self.model.to(self.device)
logger.info(f"✅ Loaded model: {model_name}")
break
except Exception as e:
logger.warning(f"⚠️ Failed to load {model_name}: {e}")
continue
except Exception as e:
logger.error(f"❌ Failed to load any Persian BERT model: {e}")
self.legal_categories = {
'قانون': ['قانون', 'ماده', 'بند', 'فصل', 'تبصره', 'اصلاحیه'],
'رای': ['رای', 'حکم', 'دادگاه', 'قاضی', 'محکوم', 'دادرسی'],
'آیین‌نامه': ['آیین‌نامه', 'دستورالعمل', 'بخشنامه', 'مقررات'],
'اخبار': ['خبر', 'گزارش', 'اعلام', 'اطلاعیه', 'بیانیه'],
'نظریه': ['نظریه', 'تفسیر', 'استعلام', 'پاسخ', 'رأی']
}
self.tfidf = None
self._init_tfidf()
def _init_tfidf(self):
"""Initialize TF-IDF vectorizer"""
try:
self.tfidf = TfidfVectorizer(
max_features=1000,
stop_words=self._get_persian_stopwords(),
ngram_range=(1, 2),
min_df=1,
max_df=0.8
)
except Exception as e:
logger.error(f"TF-IDF initialization failed: {e}")
def _get_persian_stopwords(self) -> List[str]:
"""Get Persian stopwords"""
return [
'در', 'به', 'از', 'که', 'این', 'آن', 'با', 'را', 'و', 'است',
'برای', 'تا', 'کرد', 'شد', 'می', 'خود', 'هم', 'نیز', 'یا', 'اما',
'اگر', 'چون', 'پس', 'بعد', 'قبل', 'روی', 'زیر', 'کنار', 'داخل',
'نیست', 'بود', 'باشد', 'کند', 'کنند', 'شود', 'گردد', 'دارد', 'دارند'
]
def normalize_text(self, text: str) -> str:
"""Normalize Persian text"""
if not text:
return ""
try:
text = re.sub(r'[^\w\s\u0600-\u06FF]', ' ', text)
text = re.sub(r'\s+', ' ', text)
if self.normalizer:
text = self.normalizer.normalize(text)
return text.strip()
except Exception as e:
logger.error(f"Text normalization failed: {e}")
return text.strip()
def extract_keywords(self, text: str, top_k: int = 10) -> List[str]:
"""Extract keywords using TF-IDF"""
try:
if not self.tfidf or not text:
return []
normalized_text = self.normalize_text(text)
if HAZM_AVAILABLE:
tokens = word_tokenize(normalized_text)
processed_text = ' '.join(tokens)
else:
processed_text = normalized_text
tfidf_matrix = self.tfidf.fit_transform([processed_text])
feature_names = self.tfidf.get_feature_names_out()
scores = tfidf_matrix.toarray()[0]
keyword_scores = list(zip(feature_names, scores))
keyword_scores.sort(key=lambda x: x[1], reverse=True)
return [kw[0] for kw in keyword_scores[:top_k] if kw[1] > 0]
except Exception as e:
logger.error(f"Keyword extraction failed: {e}")
return []
def classify_document(self, text: str) -> Tuple[str, float]:
"""Classify document type with confidence score"""
try:
normalized_text = self.normalize_text(text.lower())
scores = {}
for category, keywords in self.legal_categories.items():
score = 0
for keyword in keywords:
count = normalized_text.count(keyword)
score += count * (len(keyword) / 5)
if len(normalized_text) > 0:
scores[category] = score / (len(normalized_text) / 1000)
else:
scores[category] = 0
if not scores or max(scores.values()) == 0:
return "عمومی", 0.0
best_category = max(scores.items(), key=lambda x: x[1])
total_score = sum(scores.values())
confidence = min(best_category[1] / total_score, 1.0) if total_score > 0 else 0.0
return best_category[0], confidence
except Exception as e:
logger.error(f"Document classification failed: {e}")
return "عمومی", 0.0
def calculate_importance_score(self, doc: LegalDocument) -> float:
"""Calculate document importance score"""
try:
score = 0.0
title_lower = doc.title.lower()
high_importance_words = ['قانون', 'اساسی', 'حکم', 'رای', 'مصوبه']
medium_importance_words = ['آیین‌نامه', 'بخشنامه', 'دستورالعمل']
for word in high_importance_words:
if word in title_lower:
score += 0.3
break
for word in medium_importance_words:
if word in title_lower:
score += 0.2
break
content_length = len(doc.content)
if content_length > 5000:
score += 0.25
elif content_length > 2000:
score += 0.15
elif content_length > 500:
score += 0.1
if doc.date_published:
try:
date_formats = ['%Y-%m-%d', '%Y/%m/%d', '%d/%m/%Y']
pub_date = None
for fmt in date_formats:
try:
pub_date = datetime.strptime(doc.date_published, fmt)
break
except:
continue
if pub_date:
days_old = (datetime.now() - pub_date).days
if days_old < 30:
score += 0.25
elif days_old < 365:
score += 0.15
elif days_old < 1825:
score += 0.05
except:
pass
legal_keywords = ['قانون', 'ماده', 'بند', 'حکم', 'رای', 'دادگاه', 'محکمه']
content_lower = doc.content.lower()
keyword_count = sum(content_lower.count(kw) for kw in legal_keywords)
word_count = len(doc.content.split())
if word_count > 0:
keyword_density = keyword_count / word_count
score += min(keyword_density * 5, 0.2)
type_bonuses = {
'law': 0.2,
'ruling': 0.15,
'regulation': 0.1,
'news': 0.05
}
score += type_bonuses.get(doc.document_type, 0)
return min(score, 1.0)
except Exception as e:
logger.error(f"Importance score calculation failed: {e}")
return 0.0
def extract_legal_entities(self, text: str) -> List[str]:
"""Extract legal entities from text"""
try:
entities = []
patterns = {
'قوانین': r'قانون\s+[\u0600-\u06FF\s]{3,30}',
'مواد': r'ماده\s+\d+[\u0600-\u06FF\s]*',
'دادگاه‌ها': r'دادگاه\s+[\u0600-\u06FF\s]{3,30}',
'مراجع': r'(وزارت|سازمان|اداره|شورای|کمیته)\s+[\u0600-\u06FF\s]{3,30}',
'احکام': r'(حکم|رای)\s+(شماره\s+)?\d+',
}
for entity_type, pattern in patterns.items():
matches = re.findall(pattern, text)
for match in matches:
clean_match = re.sub(r'\s+', ' ', match.strip())
if len(clean_match) > 5 and len(clean_match) < 100:
entities.append(clean_match)
unique_entities = list(dict.fromkeys(entities))
return unique_entities[:15]
except Exception as e:
logger.error(f"Entity extraction failed: {e}")
return []
def get_text_embedding(self, text: str) -> Optional[List[float]]:
"""Get text embedding using available model"""
if not self.model or not self.tokenizer or not TORCH_AVAILABLE:
return None
try:
normalized_text = self.normalize_text(text)
if len(normalized_text) > 512:
normalized_text = normalized_text[:512]
if not normalized_text:
return None
inputs = self.tokenizer(
normalized_text,
return_tensors="pt",
padding=True,
truncation=True,
max_length=512
).to(self.device)
with torch.no_grad():
outputs = self.model(**inputs)
embedding = outputs.last_hidden_state[:, 0, :].cpu().numpy()[0]
return embedding.tolist()
except Exception as e:
logger.error(f"Embedding generation failed: {e}")
return None
def generate_summary(self, text: str, max_length: int = 200) -> str:
"""Generate text summary"""
try:
if len(text) <= max_length:
return text
if HAZM_AVAILABLE:
sentences = sent_tokenize(text)
else:
sentences = re.split(r'[.!?]+', text)
sentences = [s.strip() for s in sentences if s.strip()]
if len(sentences) <= 2:
return text[:max_length] + "..." if len(text) > max_length else text
keywords = self.extract_keywords(text, top_k=15)
sentence_scores = []
for sentence in sentences:
if len(sentence) < 20:
continue
score = 0
sentence_lower = sentence.lower()
for kw in keywords:
if kw in sentence_lower:
score += 1
legal_terms = ['قانون', 'ماده', 'حکم', 'رای', 'دادگاه']
for term in legal_terms:
if term in sentence_lower:
score += 0.5
if len(sentence) > 200:
score *= 0.8
sentence_scores.append((sentence, score))
sentence_scores.sort(key=lambda x: x[1], reverse=True)
selected_sentences = []
current_length = 0
for sentence, score in sentence_scores:
if current_length + len(sentence) <= max_length:
selected_sentences.append(sentence)
current_length += len(sentence)
else:
break
if not selected_sentences:
return text[:max_length] + "..."
summary = ' '.join(selected_sentences)
return summary if len(summary) <= max_length else summary[:max_length] + "..."
except Exception as e:
logger.error(f"Summary generation failed: {e}")
return text[:max_length] + "..." if len(text) > max_length else text
def process_document(self, doc: LegalDocument) -> LegalDocument:
"""Process document with all available NLP features"""
try:
logger.info(f"Processing document: {doc.title[:50]}...")
doc.keywords = self.extract_keywords(doc.content)
doc_type, confidence = self.classify_document(doc.content)
if confidence > 0.3:
doc.category = doc_type
doc.importance_score = self.calculate_importance_score(doc)
doc.legal_entities = self.extract_legal_entities(doc.content)
doc.summary = self.generate_summary(doc.content)
doc.embedding = self.get_text_embedding(doc.content)
logger.info(f"✅ Processed: {doc.title[:30]}... (Score: {doc.importance_score:.2f})")
return doc
except Exception as e:
logger.error(f"Document processing failed: {e}")
return doc
class EnhancedLegalScraper:
"""Enhanced legal scraper with real web scraping and NLP"""
def __init__(self, delay: float = 1.0):
self.delay = delay
self.session = requests.Session()
try:
self.nlp_processor = PersianNLPProcessor()
logger.info("✅ NLP processor initialized")
except Exception as e:
logger.error(f"❌ NLP processor initialization failed: {e}")
self.nlp_processor = None
self.db_path = self._get_db_path()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'fa,en-US;q=0.7,en;q=0.3',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
})
self._init_database()
def _get_db_path(self) -> str:
"""Get appropriate database path for the environment"""
possible_paths = [
"/tmp/legal_scraper.db",
"./data/legal_scraper.db",
"legal_scraper.db"
]
for path in possible_paths:
try:
Path(path).parent.mkdir(parents=True, exist_ok=True)
return path
except:
continue
return ":memory:"
def _init_database(self):
"""Initialize enhanced database with NLP fields"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS legal_documents (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
content TEXT NOT NULL,
source_url TEXT UNIQUE NOT NULL,
document_type TEXT NOT NULL,
date_published TEXT,
date_scraped TEXT NOT NULL,
category TEXT,
tags TEXT,
summary TEXT,
importance_score REAL DEFAULT 0.0,
sentiment_score REAL DEFAULT 0.0,
legal_entities TEXT,
keywords TEXT,
embedding TEXT,
language TEXT DEFAULT 'fa',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
indexes = [
'CREATE INDEX IF NOT EXISTS idx_source_url ON legal_documents(source_url)',
'CREATE INDEX IF NOT EXISTS idx_document_type ON legal_documents(document_type)',
'CREATE INDEX IF NOT EXISTS idx_importance_score ON legal_documents(importance_score DESC)',
'CREATE INDEX IF NOT EXISTS idx_category ON legal_documents(category)',
'CREATE INDEX IF NOT EXISTS idx_date_published ON legal_documents(date_published)',
'CREATE INDEX IF NOT EXISTS idx_date_scraped ON legal_documents(date_scraped DESC)'
]
for index in indexes:
cursor.execute(index)
conn.commit()
conn.close()
logger.info(f"✅ Database initialized: {self.db_path}")
except Exception as e:
logger.error(f"❌ Database initialization failed: {e}")
raise
def save_document(self, doc: LegalDocument) -> bool:
"""Save enhanced document to database"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO legal_documents
(title, content, source_url, document_type, date_published,
date_scraped, category, tags, summary, importance_score,
sentiment_score, legal_entities, keywords, embedding, language)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
doc.title,
doc.content,
doc.source_url,
doc.document_type,
doc.date_published,
doc.date_scraped,
doc.category,
json.dumps(doc.tags, ensure_ascii=False) if doc.tags else None,
doc.summary,
doc.importance_score,
doc.sentiment_score,
json.dumps(doc.legal_entities, ensure_ascii=False) if doc.legal_entities else None,
json.dumps(doc.keywords, ensure_ascii=False) if doc.keywords else None,
json.dumps(doc.embedding) if doc.embedding else None,
doc.language
))
conn.commit()
conn.close()
return True
except Exception as e:
logger.error(f"Failed to save document {doc.source_url}: {e}")
return False
def get_enhanced_statistics(self) -> Dict:
"""Get comprehensive statistics with NLP insights"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
stats = {}
cursor.execute('SELECT COUNT(*) FROM legal_documents')
stats['total_documents'] = cursor.fetchone()[0]
cursor.execute('SELECT document_type, COUNT(*) FROM legal_documents GROUP BY document_type')
stats['by_type'] = dict(cursor.fetchall())
cursor.execute('SELECT category, COUNT(*) FROM legal_documents WHERE category IS NOT NULL GROUP BY category')
stats['by_category'] = dict(cursor.fetchall())
cursor.execute('SELECT COUNT(*) FROM legal_documents WHERE importance_score >= 0.7')
high_importance = cursor.fetchone()[0]
cursor.execute('SELECT COUNT(*) FROM legal_documents WHERE importance_score >= 0.3 AND importance_score < 0.7')
medium_importance = cursor.fetchone()[0]
cursor.execute('SELECT COUNT(*) FROM legal_documents WHERE importance_score < 0.3')
low_importance = cursor.fetchone()[0]
stats['importance_distribution'] = {
'high': high_importance,
'medium': medium_importance,
'low': low_importance
}
cursor.execute('SELECT keywords FROM legal_documents WHERE keywords IS NOT NULL')
all_keywords = []
for row in cursor.fetchall():
try:
keywords = json.loads(row[0])
all_keywords.extend(keywords)
except:
continue
if all_keywords:
keyword_counts = {}
for kw in all_keywords:
keyword_counts[kw] = keyword_counts.get(kw, 0) + 1
topទ
top_keywords = sorted(keyword_counts.items(), key=lambda x: x[1], reverse=True)[:25]
stats['top_keywords'] = dict(top_keywords)
cursor.execute('''
SELECT DATE(date_scraped) as day, COUNT(*)
FROM legal_documents
WHERE date_scraped >= date('now', '-7 days')
GROUP BY DATE(date_scraped)
ORDER BY day DESC
''')
stats['recent_activity'] = dict(cursor.fetchall())
cursor.execute('''
SELECT document_type, AVG(importance_score)
FROM legal_documents
GROUP BY document_type
''')
stats['avg_importance_by_type'] = dict(cursor.fetchall())
cursor.execute('SELECT COUNT(*) FROM legal_documents WHERE embedding IS NOT NULL')
stats['documents_with_embeddings'] = cursor.fetchone()[0]
cursor.execute('SELECT language, COUNT(*) FROM legal_documents GROUP BY language')
stats['by_language'] = dict(cursor.fetchall())
conn.close()
return stats
except Exception as e:
logger.error(f"Statistics generation failed: {e}")
return {
'total_documents': 0,
'by_type': {},
'by_category': {},
'importance_distribution': {'high': 0, 'medium': 0, 'low': 0},
'top_keywords': {},
'recent_activity': {},
'avg_importance_by_type': {},
'documents_with_embeddings': 0,
'by_language': {}
}
def search_with_similarity(self, query: str, limit: int = 20) -> List[Dict]:
"""Advanced search using embeddings and similarity"""
if not self.nlp_processor or not self.nlp_processor.model:
return self._text_search(query, limit)
try:
query_embedding = self.nlp_processor.get_text_embedding(query)
if not query_embedding:
return self._text_search(query, limit)
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT id, title, content, source_url, document_type,
importance_score, summary, embedding
FROM legal_documents
WHERE embedding IS NOT NULL
''')
results = []
query_vector = np.array(query_embedding)
for row in cursor.fetchall():
try:
doc_embedding = json.loads(row[7])
doc_vector = np.array(doc_embedding)
similarity = cosine_similarity([query_vector], [doc_vector])[0][0]
combined_score = (similarity * 0.7) + (row[5] * 0.3)
results.append({
'id': row[0],
'title': row[1],
'content': row[2][:500] + "..." if len(row[2]) > 500 else row[2],
'source_url': row[3],
'document_type': row[4],
'importance_score': row[5],
'summary': row[6],
'similarity_score': similarity,
'combined_score': combined_score
})
except Exception as e:
logger.error(f"Error processing document embedding: {e}")
continue
results.sort(key=lambda x: x['combined_score'], reverse=True)
conn.close()
return results[:limit]
except Exception as e:
logger.error(f"Similarity search failed: {e}")
return self._text_search(query, limit)
def _text_search(self, query: str, limit: int = 20) -> List[Dict]:
"""Fallback text search"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
if self.nlp_processor:
normalized_query = self.nlp_processor.normalize_text(query)
else:
normalized_query = query
query_words = normalized_query.split()
search_conditions = []
params = []
for word in query_words:
search_conditions.append("(title LIKE ? OR content LIKE ?)")
params.extend([f'%{word}%', f'%{word}%'])
where_clause = " OR ".join(search_conditions)
cursor.execute(f'''
SELECT id, title, content, source_url, document_type,
importance_score, summary
FROM legal_documents
WHERE {where_clause}
ORDER BY importance_score DESC
LIMIT ?
''', params + [limit])
results = []
for row in cursor.fetchall():
results.append({
'id': row[0],
'title': row[1],
'content': row[2][:500] + "..." if len(row[2]) > 500 else row[2],
'source_url': row[3],
'document_type': row[4],
'importance_score': row[5],
'summary': row[6],
'similarity_score': 0.0
})
conn.close()
return results
except Exception as e:
logger.error(f"Text search failed: {e}")
return []
def export_to_csv(self, filename: str = None) -> str:
"""Export data to CSV with full details"""
try:
if not filename:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"legal_documents_{timestamp}.csv"
conn = sqlite3.connect(self.db_path)
query = '''
SELECT title, content, source_url, document_type,
date_published, date_scraped, category, summary,
importance_score, keywords, legal_entities
FROM legal_documents
ORDER BY importance_score DESC, date_scraped DESC
'''
df = pd.read_sql_query(query, conn)
conn.close()
for col in ['keywords', 'legal_entities']:
if col in df.columns:
df[col] = df[col].apply(lambda x: ', '.join(json.loads(x)) if x else '')
df.to_csv(filename, index=False, encoding='utf-8-sig')
logger.info(f"✅ Data exported to CSV: {filename}")
return filename
except Exception as e:
logger.error(f"CSV export failed: {e}")
return ""
def scrape_real_sources(self, urls: List[str] = IRANIAN_LEGAL_SOURCES, max_docs: int = 20) -> List[LegalDocument]:
"""Real web scraping implementation with source-specific extraction"""
documents = []
for i, url in enumerate(urls):
if len(documents) >= max_docs:
break
try:
logger.info(f"🔄 Scraping {i+1}/{len(urls)}: {url}")
time.sleep(self.delay)
response = self.session.get(url, timeout=15)
response.raise_for_status()
if response.encoding == 'ISO-8859-1':
response.encoding = response.apparent_encoding
soup = BeautifulSoup(response.content, 'html.parser')
# Extract documents using source-specific logic
extracted_items = self._extract_source_specific_content(soup, url, max_docs - len(documents))
for item in extracted_items:
if len(documents) >= max_docs:
break
doc = LegalDocument(
title=item['title'],
content=item['content'],
source_url=item['url'],
document_type=self._determine_document_type(item['title'], item['content']),
date_published=item['date']
)
if self.nlp_processor:
doc = self.nlp_processor.process_document(doc)
documents.append(doc)
logger.info(f"✅ Extracted: {doc.title[:50]}...")
except Exception as e:
logger.error(f"❌ Error scraping {url}: {e}")
continue
documents.sort(key=lambda x: x.importance_score, reverse=True)
return documents
def _extract_source_specific_content(self, soup: BeautifulSoup, url: str, max_items: int) -> List[Dict]:
"""Extract content based on source-specific selectors"""
if 'irna.ir' in url:
return self._extract_irna_content(soup, url, max_items)
elif 'tasnimnews.com' in url:
return self._extract_tasnim_content(soup, url, max_items)
elif 'mehrnews.com' in url:
return self._extract_mehr_content(soup, url, max_items)
elif 'farsnews.ir' in url:
return self._extract_fars_content(soup, url, max_items)
else:
return self._extract_generic_content(soup, url, max_items)
def _extract_irna_content(self, soup: BeautifulSoup, base_url: str, max_items: int) -> List[Dict]:
"""Extract content from IRNA"""
items = []
try:
articles = soup.select('.news-item, .article, .story')[:max_items]
for article in articles:
title_elem = soup.select_one('h1, h2, h3, .title, .headline, a')
if title_elem:
title = title_elem.get_text(strip=True)
content = article.get_text(strip=True)
if len(title) > 10 and len(content) > 100:
items.append({
'title': title,
'content': content,
'url': base_url,
'date': self._extract_date(soup)
})
if not items:
main_content = soup.select_one('main, .main-content, .content, article')
if main_content:
title = soup.select_one('h1, title')
title_text = title.get_text(strip=True) if title else "خبر ایرنا"
content_text = main_content.get_text(strip=True)
if len(content_text) > 200:
items.append({
'title': title_text,
'content': content_text,
'url': base_url,
'date': self._extract_date(soup)
})
except Exception as e:
logger.error(f"IRNA extraction error: {e}")
return items
def _extract_tasnim_content(self, soup: BeautifulSoup, base_url: str, max_items: int) -> List[Dict]:
"""Extract content from Tasnim"""
items = []
try:
articles = soup.select('.news-box, .item, .story-item')[:max_items]
for article in articles:
title_elem = article.select_one('h2, h3, .title, a')
if title_elem:
title = title_elem.get_text(strip=True)
content = article.get_text(strip=True)
if len(title) > 10 and len(content) > 100:
items.append({
'title': title,
'content': content,
'url': base_url,
'date': self._extract_date(soup)
})
if not items:
main_content = soup.select_one('.news-content, .story-body, main')
if main_content:
title = soup.select_one('h1, .news-title')
title_text = title.get_text(strip=True) if title else "خبر تسنیم"
content_text = main_content.get_text(strip=True)
if len(content_text) > 200:
items.append({
'title': title_text,
'content': content_text,
'url': base_url,
'date': self._extract_date(soup)
})
except Exception as e:
logger.error(f"Tasnim extraction error: {e}")
return items
def _extract_mehr_content(self, soup: BeautifulSoup, base_url: str, max_items: int) -> List[Dict]:
"""Extract content from Mehr News"""
items = []
try:
articles = soup.select('.news-item, .article-item, .story')[:max_items]
for article in articles:
title_elem = article.select_one('h2, h3, .title, .headline')
if title_elem:
title = title_elem.get_text(strip=True)
content = article.get_text(strip=True)
if len(title) > 10 and len(content) > 100:
items.append({
'title': title,
'content': content,
'url': base_url,
'date': self._extract_date(soup)
})
if not items:
main_content = soup.select_one('.content, .news-body, article')
if main_content:
title = soup.select_one('h1, .page-title')
title_text = title.get_text(strip=True) if title else "خبر مهر"
content_text = main_content.get_text(strip=True)
if len(content_text) > 200:
items.append({
'title': title_text,
'content': content_text,
'url': base_url,
'date': self._extract_date(soup)
})
except Exception as e:
logger.error(f"Mehr extraction error: {e}")
return items
def _extract_fars_content(self, soup: BeautifulSoup, base_url: str, max_items: int) -> List[Dict]:
"""Extract content from Fars News"""
items = []
try:
articles = soup.select('.news, .item, .story-item')[:max_items]
for article in articles:
title_elem = article.select_one('h2, h3, .title, a')
if title_elem:
title = title_elem.get_text(strip=True)
content = article.get_text(strip=True)
if len(title) > 10 and len(content) > 100:
items.append({
'title': title,
'content': content,
'url': base_url,
'date': self._extract_date(soup)
})
if not items:
main_content = soup.select_one('.news-content, .story, main')
if main_content:
title = soup.select_one('h1, .news-title')
title_text = title.get_text(strip=True) if title else "خبر فارس"
content_text = main_content.get_text(strip=True)
if len(content_text) > 200:
items.append({
'title': title_text,
'content': content_text,
'url': base_url,
'date': self._extract_date(soup)
})
except Exception as e:
logger.error(f"Fars extraction error: {e}")
return items
def _extract_generic_content(self, soup: BeautifulSoup, base_url: str, max_items: int) -> List[Dict]:
"""Generic content extraction for unknown sources"""
items = []
try:
articles = soup.select('article, .article, .post, .news-item, .story')[:max_items]
for article in articles:
title_elem = article.select_one('h1, h2, h3, .title, .headline')
if title_elem:
title = title_elem.get_text(strip=True)
content = article.get_text(strip=True)
if len(title) > 10 and len(content) > 150:
items.append({
'title': title,
'content': content,
'url': base_url,
'date': self._extract_date(soup)
})
if not items:
title_elem = soup.select_one('h1, title')
content_elem = soup.select_one('main, .main-content, .content, .entry-content, body')
if title_elem and content_elem:
for unwanted in content_elem(['script', 'style', 'nav', 'header', 'footer']):
unwanted.decompose()
title = title_elem.get_text(strip=True)
content = content_elem.get_text(strip=True)
if len(title) > 5 and len(content) > 200:
items.append({
'title': title,
'content': content,
'url': base_url,
'date': self._extract_date(soup)
})
except Exception as e:
logger.error(f"Generic extraction error: {e}")
return items
def _extract_document_from_soup(self, soup: BeautifulSoup, url: str) -> Optional[LegalDocument]:
"""Extract main document from BeautifulSoup object using source-specific logic"""
try:
items = self._extract_source_specific_content(soup, url, 1)
if not items:
return None
item = items[0]
return LegalDocument(
title=item['title'],
content=item['content'],
source_url=item['url'],
document_type=self._determine_document_type(item['title'], item['content']),
date_published=item['date']
)
except Exception as e:
logger.error(f"Document extraction failed: {e}")
return None
def _extract_additional_articles(self, soup: BeautifulSoup, base_url: str) -> List[LegalDocument]:
"""Extract additional articles from the same page using source-specific logic"""
documents = []
try:
items = self._extract_source_specific_content(soup, base_url, 3)
for item in items:
doc = LegalDocument(
title=item['title'],
content=item['content'],
source_url=item['url'],
document_type=self._determine_document_type(item['title'], item['content']),
date_published=item['date']
)
documents.append(doc)
except Exception as e:
logger.error(f"Additional articles extraction failed: {e}")
return documents[:3]
def _determine_document_type(self, title: str, content: str) -> str:
"""Determine document type based on content"""
text = (title + " " + content).lower()
if any(word in text for word in ['قانون', 'ماده', 'فصل', 'بند', 'تبصره']):
return 'law'
elif any(word in text for word in ['رای', 'حکم', 'دادگاه', 'قاضی']):
return 'ruling'
elif any(word in text for word in ['آیین‌نامه', 'دستورالعمل', 'بخشنامه']):
return 'regulation'
elif any(word in text for word in ['خبر', 'اعلام', 'گزارش', 'اطلاعیه']):
return 'news'
else:
return 'general'
def _extract_date(self, soup: BeautifulSoup) -> Optional[str]:
"""Extract publication date"""
try:
date_selectors = [
'meta[name="article:published_time"]',
'meta[property="article:published_time"]',
'meta[name="date"]',
'meta[name="DC.date"]',
'.date',
'.publish-date',
'.article-date',
'time[datetime]'
]
for selector in date_selectors:
element = soup.select_one(selector)
if element:
date_str = element.get('content') or element.get('datetime') or element.get_text()
if date_str:
return self._normalize_date(date_str)
text = soup.get_text()
persian_date_patterns = [
r'(\d{4}/\d{1,2}/\d{1,2})',
r'(\d{1,2}/\d{1,2}/\d{4})',
r'(\d{4}-\d{1,2}-\d{1,2})'
]
for pattern in persian_date_patterns:
match = re.search(pattern, text)
if match:
return match.group(1)
return None
except Exception:
return None
def _normalize_date(self, date_str: str) -> Optional[str]:
"""Normalize date string to standard format"""
try:
date_str = re.sub(r'[^\d/\-:]', ' ', date_str).strip()
formats = [
'%Y-%m-%d',
'%Y/%m/%d',
'%d/%m/%Y',
'%Y-%m-%d %H:%M:%S',
'%Y/%m/%d %H:%M:%S'
]
for fmt in formats:
try:
parsed_date = datetime.strptime(date_str, fmt)
return parsed_date.strftime('%Y-%m-%d')
except ValueError:
continue
return date_str
except Exception:
return None