Spaces:
Paused
Paused
| import requests | |
| import time | |
| import json | |
| import csv | |
| import sqlite3 | |
| import logging | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Optional, Tuple, Union | |
| from dataclasses import dataclass, asdict | |
| from pathlib import Path | |
| import re | |
| import pandas as pd | |
| import numpy as np | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from bs4 import BeautifulSoup | |
| try: | |
| import torch | |
| from transformers import AutoTokenizer, AutoModel | |
| TORCH_AVAILABLE = True | |
| except ImportError: | |
| TORCH_AVAILABLE = False | |
| print("⚠️ PyTorch not available, running without advanced NLP features") | |
| try: | |
| import hazm | |
| from hazm import Normalizer, word_tokenize, sent_tokenize | |
| HAZM_AVAILABLE = True | |
| except ImportError: | |
| HAZM_AVAILABLE = False | |
| print("⚠️ Hazm not available, using basic text processing") | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler('legal_scraper.log'), | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Predefined Iranian legal and news sources | |
| IRANIAN_LEGAL_SOURCES = [ | |
| "https://www.irna.ir", # خبرگزاری جمهوری اسلامی | |
| "https://www.tasnimnews.com", # خبرگزاری تسنیم | |
| "https://www.mehrnews.com", # خبرگزاری مهر | |
| "https://www.farsnews.ir", # خبرگزاری فارس | |
| "https://iribnews.ir", # خبرگزاری صدا و سیما | |
| "https://www.dolat.ir", # پورتال دولت | |
| "https://rc.majlis.ir", # مرکز پژوهشهای مجلس | |
| ] | |
| class LegalDocument: | |
| """Enhanced legal document with NLP features""" | |
| title: str | |
| content: str | |
| source_url: str | |
| document_type: str | |
| date_published: Optional[str] = None | |
| date_scraped: str = None | |
| category: Optional[str] = None | |
| tags: List[str] = None | |
| summary: Optional[str] = None | |
| importance_score: float = 0.0 | |
| sentiment_score: float = 0.0 | |
| legal_entities: List[str] = None | |
| keywords: List[str] = None | |
| embedding: List[float] = None | |
| language: str = "fa" | |
| def __post_init__(self): | |
| if self.date_scraped is None: | |
| self.date_scraped = datetime.now().isoformat() | |
| if self.tags is None: | |
| self.tags = [] | |
| if self.legal_entities is None: | |
| self.legal_entities = [] | |
| if self.keywords is None: | |
| self.keywords = [] | |
| class PersianNLPProcessor: | |
| """Persian NLP processor using available models""" | |
| def __init__(self): | |
| if HAZM_AVAILABLE: | |
| self.normalizer = Normalizer() | |
| else: | |
| self.normalizer = None | |
| self.device = torch.device('cpu') | |
| self.tokenizer = None | |
| self.model = None | |
| if TORCH_AVAILABLE: | |
| try: | |
| model_names = [ | |
| "HooshvareLab/bert-fa-base-uncased", | |
| "HooshvareLab/bert-base-parsbert-uncased", | |
| "distilbert-base-multilingual-cased" | |
| ] | |
| for model_name in model_names: | |
| try: | |
| self.tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| self.model = AutoModel.from_pretrained(model_name) | |
| self.model.to(self.device) | |
| logger.info(f"✅ Loaded model: {model_name}") | |
| break | |
| except Exception as e: | |
| logger.warning(f"⚠️ Failed to load {model_name}: {e}") | |
| continue | |
| except Exception as e: | |
| logger.error(f"❌ Failed to load any Persian BERT model: {e}") | |
| self.legal_categories = { | |
| 'قانون': ['قانون', 'ماده', 'بند', 'فصل', 'تبصره', 'اصلاحیه'], | |
| 'رای': ['رای', 'حکم', 'دادگاه', 'قاضی', 'محکوم', 'دادرسی'], | |
| 'آییننامه': ['آییننامه', 'دستورالعمل', 'بخشنامه', 'مقررات'], | |
| 'اخبار': ['خبر', 'گزارش', 'اعلام', 'اطلاعیه', 'بیانیه'], | |
| 'نظریه': ['نظریه', 'تفسیر', 'استعلام', 'پاسخ', 'رأی'] | |
| } | |
| self.tfidf = None | |
| self._init_tfidf() | |
| def _init_tfidf(self): | |
| """Initialize TF-IDF vectorizer""" | |
| try: | |
| self.tfidf = TfidfVectorizer( | |
| max_features=1000, | |
| stop_words=self._get_persian_stopwords(), | |
| ngram_range=(1, 2), | |
| min_df=1, | |
| max_df=0.8 | |
| ) | |
| except Exception as e: | |
| logger.error(f"TF-IDF initialization failed: {e}") | |
| def _get_persian_stopwords(self) -> List[str]: | |
| """Get Persian stopwords""" | |
| return [ | |
| 'در', 'به', 'از', 'که', 'این', 'آن', 'با', 'را', 'و', 'است', | |
| 'برای', 'تا', 'کرد', 'شد', 'می', 'خود', 'هم', 'نیز', 'یا', 'اما', | |
| 'اگر', 'چون', 'پس', 'بعد', 'قبل', 'روی', 'زیر', 'کنار', 'داخل', | |
| 'نیست', 'بود', 'باشد', 'کند', 'کنند', 'شود', 'گردد', 'دارد', 'دارند' | |
| ] | |
| def normalize_text(self, text: str) -> str: | |
| """Normalize Persian text""" | |
| if not text: | |
| return "" | |
| try: | |
| text = re.sub(r'[^\w\s\u0600-\u06FF]', ' ', text) | |
| text = re.sub(r'\s+', ' ', text) | |
| if self.normalizer: | |
| text = self.normalizer.normalize(text) | |
| return text.strip() | |
| except Exception as e: | |
| logger.error(f"Text normalization failed: {e}") | |
| return text.strip() | |
| def extract_keywords(self, text: str, top_k: int = 10) -> List[str]: | |
| """Extract keywords using TF-IDF""" | |
| try: | |
| if not self.tfidf or not text: | |
| return [] | |
| normalized_text = self.normalize_text(text) | |
| if HAZM_AVAILABLE: | |
| tokens = word_tokenize(normalized_text) | |
| processed_text = ' '.join(tokens) | |
| else: | |
| processed_text = normalized_text | |
| tfidf_matrix = self.tfidf.fit_transform([processed_text]) | |
| feature_names = self.tfidf.get_feature_names_out() | |
| scores = tfidf_matrix.toarray()[0] | |
| keyword_scores = list(zip(feature_names, scores)) | |
| keyword_scores.sort(key=lambda x: x[1], reverse=True) | |
| return [kw[0] for kw in keyword_scores[:top_k] if kw[1] > 0] | |
| except Exception as e: | |
| logger.error(f"Keyword extraction failed: {e}") | |
| return [] | |
| def classify_document(self, text: str) -> Tuple[str, float]: | |
| """Classify document type with confidence score""" | |
| try: | |
| normalized_text = self.normalize_text(text.lower()) | |
| scores = {} | |
| for category, keywords in self.legal_categories.items(): | |
| score = 0 | |
| for keyword in keywords: | |
| count = normalized_text.count(keyword) | |
| score += count * (len(keyword) / 5) | |
| if len(normalized_text) > 0: | |
| scores[category] = score / (len(normalized_text) / 1000) | |
| else: | |
| scores[category] = 0 | |
| if not scores or max(scores.values()) == 0: | |
| return "عمومی", 0.0 | |
| best_category = max(scores.items(), key=lambda x: x[1]) | |
| total_score = sum(scores.values()) | |
| confidence = min(best_category[1] / total_score, 1.0) if total_score > 0 else 0.0 | |
| return best_category[0], confidence | |
| except Exception as e: | |
| logger.error(f"Document classification failed: {e}") | |
| return "عمومی", 0.0 | |
| def calculate_importance_score(self, doc: LegalDocument) -> float: | |
| """Calculate document importance score""" | |
| try: | |
| score = 0.0 | |
| title_lower = doc.title.lower() | |
| high_importance_words = ['قانون', 'اساسی', 'حکم', 'رای', 'مصوبه'] | |
| medium_importance_words = ['آییننامه', 'بخشنامه', 'دستورالعمل'] | |
| for word in high_importance_words: | |
| if word in title_lower: | |
| score += 0.3 | |
| break | |
| for word in medium_importance_words: | |
| if word in title_lower: | |
| score += 0.2 | |
| break | |
| content_length = len(doc.content) | |
| if content_length > 5000: | |
| score += 0.25 | |
| elif content_length > 2000: | |
| score += 0.15 | |
| elif content_length > 500: | |
| score += 0.1 | |
| if doc.date_published: | |
| try: | |
| date_formats = ['%Y-%m-%d', '%Y/%m/%d', '%d/%m/%Y'] | |
| pub_date = None | |
| for fmt in date_formats: | |
| try: | |
| pub_date = datetime.strptime(doc.date_published, fmt) | |
| break | |
| except: | |
| continue | |
| if pub_date: | |
| days_old = (datetime.now() - pub_date).days | |
| if days_old < 30: | |
| score += 0.25 | |
| elif days_old < 365: | |
| score += 0.15 | |
| elif days_old < 1825: | |
| score += 0.05 | |
| except: | |
| pass | |
| legal_keywords = ['قانون', 'ماده', 'بند', 'حکم', 'رای', 'دادگاه', 'محکمه'] | |
| content_lower = doc.content.lower() | |
| keyword_count = sum(content_lower.count(kw) for kw in legal_keywords) | |
| word_count = len(doc.content.split()) | |
| if word_count > 0: | |
| keyword_density = keyword_count / word_count | |
| score += min(keyword_density * 5, 0.2) | |
| type_bonuses = { | |
| 'law': 0.2, | |
| 'ruling': 0.15, | |
| 'regulation': 0.1, | |
| 'news': 0.05 | |
| } | |
| score += type_bonuses.get(doc.document_type, 0) | |
| return min(score, 1.0) | |
| except Exception as e: | |
| logger.error(f"Importance score calculation failed: {e}") | |
| return 0.0 | |
| def extract_legal_entities(self, text: str) -> List[str]: | |
| """Extract legal entities from text""" | |
| try: | |
| entities = [] | |
| patterns = { | |
| 'قوانین': r'قانون\s+[\u0600-\u06FF\s]{3,30}', | |
| 'مواد': r'ماده\s+\d+[\u0600-\u06FF\s]*', | |
| 'دادگاهها': r'دادگاه\s+[\u0600-\u06FF\s]{3,30}', | |
| 'مراجع': r'(وزارت|سازمان|اداره|شورای|کمیته)\s+[\u0600-\u06FF\s]{3,30}', | |
| 'احکام': r'(حکم|رای)\s+(شماره\s+)?\d+', | |
| } | |
| for entity_type, pattern in patterns.items(): | |
| matches = re.findall(pattern, text) | |
| for match in matches: | |
| clean_match = re.sub(r'\s+', ' ', match.strip()) | |
| if len(clean_match) > 5 and len(clean_match) < 100: | |
| entities.append(clean_match) | |
| unique_entities = list(dict.fromkeys(entities)) | |
| return unique_entities[:15] | |
| except Exception as e: | |
| logger.error(f"Entity extraction failed: {e}") | |
| return [] | |
| def get_text_embedding(self, text: str) -> Optional[List[float]]: | |
| """Get text embedding using available model""" | |
| if not self.model or not self.tokenizer or not TORCH_AVAILABLE: | |
| return None | |
| try: | |
| normalized_text = self.normalize_text(text) | |
| if len(normalized_text) > 512: | |
| normalized_text = normalized_text[:512] | |
| if not normalized_text: | |
| return None | |
| inputs = self.tokenizer( | |
| normalized_text, | |
| return_tensors="pt", | |
| padding=True, | |
| truncation=True, | |
| max_length=512 | |
| ).to(self.device) | |
| with torch.no_grad(): | |
| outputs = self.model(**inputs) | |
| embedding = outputs.last_hidden_state[:, 0, :].cpu().numpy()[0] | |
| return embedding.tolist() | |
| except Exception as e: | |
| logger.error(f"Embedding generation failed: {e}") | |
| return None | |
| def generate_summary(self, text: str, max_length: int = 200) -> str: | |
| """Generate text summary""" | |
| try: | |
| if len(text) <= max_length: | |
| return text | |
| if HAZM_AVAILABLE: | |
| sentences = sent_tokenize(text) | |
| else: | |
| sentences = re.split(r'[.!?]+', text) | |
| sentences = [s.strip() for s in sentences if s.strip()] | |
| if len(sentences) <= 2: | |
| return text[:max_length] + "..." if len(text) > max_length else text | |
| keywords = self.extract_keywords(text, top_k=15) | |
| sentence_scores = [] | |
| for sentence in sentences: | |
| if len(sentence) < 20: | |
| continue | |
| score = 0 | |
| sentence_lower = sentence.lower() | |
| for kw in keywords: | |
| if kw in sentence_lower: | |
| score += 1 | |
| legal_terms = ['قانون', 'ماده', 'حکم', 'رای', 'دادگاه'] | |
| for term in legal_terms: | |
| if term in sentence_lower: | |
| score += 0.5 | |
| if len(sentence) > 200: | |
| score *= 0.8 | |
| sentence_scores.append((sentence, score)) | |
| sentence_scores.sort(key=lambda x: x[1], reverse=True) | |
| selected_sentences = [] | |
| current_length = 0 | |
| for sentence, score in sentence_scores: | |
| if current_length + len(sentence) <= max_length: | |
| selected_sentences.append(sentence) | |
| current_length += len(sentence) | |
| else: | |
| break | |
| if not selected_sentences: | |
| return text[:max_length] + "..." | |
| summary = ' '.join(selected_sentences) | |
| return summary if len(summary) <= max_length else summary[:max_length] + "..." | |
| except Exception as e: | |
| logger.error(f"Summary generation failed: {e}") | |
| return text[:max_length] + "..." if len(text) > max_length else text | |
| def process_document(self, doc: LegalDocument) -> LegalDocument: | |
| """Process document with all available NLP features""" | |
| try: | |
| logger.info(f"Processing document: {doc.title[:50]}...") | |
| doc.keywords = self.extract_keywords(doc.content) | |
| doc_type, confidence = self.classify_document(doc.content) | |
| if confidence > 0.3: | |
| doc.category = doc_type | |
| doc.importance_score = self.calculate_importance_score(doc) | |
| doc.legal_entities = self.extract_legal_entities(doc.content) | |
| doc.summary = self.generate_summary(doc.content) | |
| doc.embedding = self.get_text_embedding(doc.content) | |
| logger.info(f"✅ Processed: {doc.title[:30]}... (Score: {doc.importance_score:.2f})") | |
| return doc | |
| except Exception as e: | |
| logger.error(f"Document processing failed: {e}") | |
| return doc | |
| class EnhancedLegalScraper: | |
| """Enhanced legal scraper with real web scraping and NLP""" | |
| def __init__(self, delay: float = 1.0): | |
| self.delay = delay | |
| self.session = requests.Session() | |
| try: | |
| self.nlp_processor = PersianNLPProcessor() | |
| logger.info("✅ NLP processor initialized") | |
| except Exception as e: | |
| logger.error(f"❌ NLP processor initialization failed: {e}") | |
| self.nlp_processor = None | |
| self.db_path = self._get_db_path() | |
| self.session.headers.update({ | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', | |
| 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', | |
| 'Accept-Language': 'fa,en-US;q=0.7,en;q=0.3', | |
| 'Accept-Encoding': 'gzip, deflate', | |
| 'Connection': 'keep-alive', | |
| 'Upgrade-Insecure-Requests': '1', | |
| }) | |
| self._init_database() | |
| def _get_db_path(self) -> str: | |
| """Get appropriate database path for the environment""" | |
| possible_paths = [ | |
| "/tmp/legal_scraper.db", | |
| "./data/legal_scraper.db", | |
| "legal_scraper.db" | |
| ] | |
| for path in possible_paths: | |
| try: | |
| Path(path).parent.mkdir(parents=True, exist_ok=True) | |
| return path | |
| except: | |
| continue | |
| return ":memory:" | |
| def _init_database(self): | |
| """Initialize enhanced database with NLP fields""" | |
| try: | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS legal_documents ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| title TEXT NOT NULL, | |
| content TEXT NOT NULL, | |
| source_url TEXT UNIQUE NOT NULL, | |
| document_type TEXT NOT NULL, | |
| date_published TEXT, | |
| date_scraped TEXT NOT NULL, | |
| category TEXT, | |
| tags TEXT, | |
| summary TEXT, | |
| importance_score REAL DEFAULT 0.0, | |
| sentiment_score REAL DEFAULT 0.0, | |
| legal_entities TEXT, | |
| keywords TEXT, | |
| embedding TEXT, | |
| language TEXT DEFAULT 'fa', | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| ''') | |
| indexes = [ | |
| 'CREATE INDEX IF NOT EXISTS idx_source_url ON legal_documents(source_url)', | |
| 'CREATE INDEX IF NOT EXISTS idx_document_type ON legal_documents(document_type)', | |
| 'CREATE INDEX IF NOT EXISTS idx_importance_score ON legal_documents(importance_score DESC)', | |
| 'CREATE INDEX IF NOT EXISTS idx_category ON legal_documents(category)', | |
| 'CREATE INDEX IF NOT EXISTS idx_date_published ON legal_documents(date_published)', | |
| 'CREATE INDEX IF NOT EXISTS idx_date_scraped ON legal_documents(date_scraped DESC)' | |
| ] | |
| for index in indexes: | |
| cursor.execute(index) | |
| conn.commit() | |
| conn.close() | |
| logger.info(f"✅ Database initialized: {self.db_path}") | |
| except Exception as e: | |
| logger.error(f"❌ Database initialization failed: {e}") | |
| raise | |
| def save_document(self, doc: LegalDocument) -> bool: | |
| """Save enhanced document to database""" | |
| try: | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| INSERT OR REPLACE INTO legal_documents | |
| (title, content, source_url, document_type, date_published, | |
| date_scraped, category, tags, summary, importance_score, | |
| sentiment_score, legal_entities, keywords, embedding, language) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| ''', ( | |
| doc.title, | |
| doc.content, | |
| doc.source_url, | |
| doc.document_type, | |
| doc.date_published, | |
| doc.date_scraped, | |
| doc.category, | |
| json.dumps(doc.tags, ensure_ascii=False) if doc.tags else None, | |
| doc.summary, | |
| doc.importance_score, | |
| doc.sentiment_score, | |
| json.dumps(doc.legal_entities, ensure_ascii=False) if doc.legal_entities else None, | |
| json.dumps(doc.keywords, ensure_ascii=False) if doc.keywords else None, | |
| json.dumps(doc.embedding) if doc.embedding else None, | |
| doc.language | |
| )) | |
| conn.commit() | |
| conn.close() | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to save document {doc.source_url}: {e}") | |
| return False | |
| def get_enhanced_statistics(self) -> Dict: | |
| """Get comprehensive statistics with NLP insights""" | |
| try: | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| stats = {} | |
| cursor.execute('SELECT COUNT(*) FROM legal_documents') | |
| stats['total_documents'] = cursor.fetchone()[0] | |
| cursor.execute('SELECT document_type, COUNT(*) FROM legal_documents GROUP BY document_type') | |
| stats['by_type'] = dict(cursor.fetchall()) | |
| cursor.execute('SELECT category, COUNT(*) FROM legal_documents WHERE category IS NOT NULL GROUP BY category') | |
| stats['by_category'] = dict(cursor.fetchall()) | |
| cursor.execute('SELECT COUNT(*) FROM legal_documents WHERE importance_score >= 0.7') | |
| high_importance = cursor.fetchone()[0] | |
| cursor.execute('SELECT COUNT(*) FROM legal_documents WHERE importance_score >= 0.3 AND importance_score < 0.7') | |
| medium_importance = cursor.fetchone()[0] | |
| cursor.execute('SELECT COUNT(*) FROM legal_documents WHERE importance_score < 0.3') | |
| low_importance = cursor.fetchone()[0] | |
| stats['importance_distribution'] = { | |
| 'high': high_importance, | |
| 'medium': medium_importance, | |
| 'low': low_importance | |
| } | |
| cursor.execute('SELECT keywords FROM legal_documents WHERE keywords IS NOT NULL') | |
| all_keywords = [] | |
| for row in cursor.fetchall(): | |
| try: | |
| keywords = json.loads(row[0]) | |
| all_keywords.extend(keywords) | |
| except: | |
| continue | |
| if all_keywords: | |
| keyword_counts = {} | |
| for kw in all_keywords: | |
| keyword_counts[kw] = keyword_counts.get(kw, 0) + 1 | |
| topទ | |
| top_keywords = sorted(keyword_counts.items(), key=lambda x: x[1], reverse=True)[:25] | |
| stats['top_keywords'] = dict(top_keywords) | |
| cursor.execute(''' | |
| SELECT DATE(date_scraped) as day, COUNT(*) | |
| FROM legal_documents | |
| WHERE date_scraped >= date('now', '-7 days') | |
| GROUP BY DATE(date_scraped) | |
| ORDER BY day DESC | |
| ''') | |
| stats['recent_activity'] = dict(cursor.fetchall()) | |
| cursor.execute(''' | |
| SELECT document_type, AVG(importance_score) | |
| FROM legal_documents | |
| GROUP BY document_type | |
| ''') | |
| stats['avg_importance_by_type'] = dict(cursor.fetchall()) | |
| cursor.execute('SELECT COUNT(*) FROM legal_documents WHERE embedding IS NOT NULL') | |
| stats['documents_with_embeddings'] = cursor.fetchone()[0] | |
| cursor.execute('SELECT language, COUNT(*) FROM legal_documents GROUP BY language') | |
| stats['by_language'] = dict(cursor.fetchall()) | |
| conn.close() | |
| return stats | |
| except Exception as e: | |
| logger.error(f"Statistics generation failed: {e}") | |
| return { | |
| 'total_documents': 0, | |
| 'by_type': {}, | |
| 'by_category': {}, | |
| 'importance_distribution': {'high': 0, 'medium': 0, 'low': 0}, | |
| 'top_keywords': {}, | |
| 'recent_activity': {}, | |
| 'avg_importance_by_type': {}, | |
| 'documents_with_embeddings': 0, | |
| 'by_language': {} | |
| } | |
| def search_with_similarity(self, query: str, limit: int = 20) -> List[Dict]: | |
| """Advanced search using embeddings and similarity""" | |
| if not self.nlp_processor or not self.nlp_processor.model: | |
| return self._text_search(query, limit) | |
| try: | |
| query_embedding = self.nlp_processor.get_text_embedding(query) | |
| if not query_embedding: | |
| return self._text_search(query, limit) | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| SELECT id, title, content, source_url, document_type, | |
| importance_score, summary, embedding | |
| FROM legal_documents | |
| WHERE embedding IS NOT NULL | |
| ''') | |
| results = [] | |
| query_vector = np.array(query_embedding) | |
| for row in cursor.fetchall(): | |
| try: | |
| doc_embedding = json.loads(row[7]) | |
| doc_vector = np.array(doc_embedding) | |
| similarity = cosine_similarity([query_vector], [doc_vector])[0][0] | |
| combined_score = (similarity * 0.7) + (row[5] * 0.3) | |
| results.append({ | |
| 'id': row[0], | |
| 'title': row[1], | |
| 'content': row[2][:500] + "..." if len(row[2]) > 500 else row[2], | |
| 'source_url': row[3], | |
| 'document_type': row[4], | |
| 'importance_score': row[5], | |
| 'summary': row[6], | |
| 'similarity_score': similarity, | |
| 'combined_score': combined_score | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error processing document embedding: {e}") | |
| continue | |
| results.sort(key=lambda x: x['combined_score'], reverse=True) | |
| conn.close() | |
| return results[:limit] | |
| except Exception as e: | |
| logger.error(f"Similarity search failed: {e}") | |
| return self._text_search(query, limit) | |
| def _text_search(self, query: str, limit: int = 20) -> List[Dict]: | |
| """Fallback text search""" | |
| try: | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| if self.nlp_processor: | |
| normalized_query = self.nlp_processor.normalize_text(query) | |
| else: | |
| normalized_query = query | |
| query_words = normalized_query.split() | |
| search_conditions = [] | |
| params = [] | |
| for word in query_words: | |
| search_conditions.append("(title LIKE ? OR content LIKE ?)") | |
| params.extend([f'%{word}%', f'%{word}%']) | |
| where_clause = " OR ".join(search_conditions) | |
| cursor.execute(f''' | |
| SELECT id, title, content, source_url, document_type, | |
| importance_score, summary | |
| FROM legal_documents | |
| WHERE {where_clause} | |
| ORDER BY importance_score DESC | |
| LIMIT ? | |
| ''', params + [limit]) | |
| results = [] | |
| for row in cursor.fetchall(): | |
| results.append({ | |
| 'id': row[0], | |
| 'title': row[1], | |
| 'content': row[2][:500] + "..." if len(row[2]) > 500 else row[2], | |
| 'source_url': row[3], | |
| 'document_type': row[4], | |
| 'importance_score': row[5], | |
| 'summary': row[6], | |
| 'similarity_score': 0.0 | |
| }) | |
| conn.close() | |
| return results | |
| except Exception as e: | |
| logger.error(f"Text search failed: {e}") | |
| return [] | |
| def export_to_csv(self, filename: str = None) -> str: | |
| """Export data to CSV with full details""" | |
| try: | |
| if not filename: | |
| timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
| filename = f"legal_documents_{timestamp}.csv" | |
| conn = sqlite3.connect(self.db_path) | |
| query = ''' | |
| SELECT title, content, source_url, document_type, | |
| date_published, date_scraped, category, summary, | |
| importance_score, keywords, legal_entities | |
| FROM legal_documents | |
| ORDER BY importance_score DESC, date_scraped DESC | |
| ''' | |
| df = pd.read_sql_query(query, conn) | |
| conn.close() | |
| for col in ['keywords', 'legal_entities']: | |
| if col in df.columns: | |
| df[col] = df[col].apply(lambda x: ', '.join(json.loads(x)) if x else '') | |
| df.to_csv(filename, index=False, encoding='utf-8-sig') | |
| logger.info(f"✅ Data exported to CSV: {filename}") | |
| return filename | |
| except Exception as e: | |
| logger.error(f"CSV export failed: {e}") | |
| return "" | |
| def scrape_real_sources(self, urls: List[str] = IRANIAN_LEGAL_SOURCES, max_docs: int = 20) -> List[LegalDocument]: | |
| """Real web scraping implementation with source-specific extraction""" | |
| documents = [] | |
| for i, url in enumerate(urls): | |
| if len(documents) >= max_docs: | |
| break | |
| try: | |
| logger.info(f"🔄 Scraping {i+1}/{len(urls)}: {url}") | |
| time.sleep(self.delay) | |
| response = self.session.get(url, timeout=15) | |
| response.raise_for_status() | |
| if response.encoding == 'ISO-8859-1': | |
| response.encoding = response.apparent_encoding | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| # Extract documents using source-specific logic | |
| extracted_items = self._extract_source_specific_content(soup, url, max_docs - len(documents)) | |
| for item in extracted_items: | |
| if len(documents) >= max_docs: | |
| break | |
| doc = LegalDocument( | |
| title=item['title'], | |
| content=item['content'], | |
| source_url=item['url'], | |
| document_type=self._determine_document_type(item['title'], item['content']), | |
| date_published=item['date'] | |
| ) | |
| if self.nlp_processor: | |
| doc = self.nlp_processor.process_document(doc) | |
| documents.append(doc) | |
| logger.info(f"✅ Extracted: {doc.title[:50]}...") | |
| except Exception as e: | |
| logger.error(f"❌ Error scraping {url}: {e}") | |
| continue | |
| documents.sort(key=lambda x: x.importance_score, reverse=True) | |
| return documents | |
| def _extract_source_specific_content(self, soup: BeautifulSoup, url: str, max_items: int) -> List[Dict]: | |
| """Extract content based on source-specific selectors""" | |
| if 'irna.ir' in url: | |
| return self._extract_irna_content(soup, url, max_items) | |
| elif 'tasnimnews.com' in url: | |
| return self._extract_tasnim_content(soup, url, max_items) | |
| elif 'mehrnews.com' in url: | |
| return self._extract_mehr_content(soup, url, max_items) | |
| elif 'farsnews.ir' in url: | |
| return self._extract_fars_content(soup, url, max_items) | |
| else: | |
| return self._extract_generic_content(soup, url, max_items) | |
| def _extract_irna_content(self, soup: BeautifulSoup, base_url: str, max_items: int) -> List[Dict]: | |
| """Extract content from IRNA""" | |
| items = [] | |
| try: | |
| articles = soup.select('.news-item, .article, .story')[:max_items] | |
| for article in articles: | |
| title_elem = soup.select_one('h1, h2, h3, .title, .headline, a') | |
| if title_elem: | |
| title = title_elem.get_text(strip=True) | |
| content = article.get_text(strip=True) | |
| if len(title) > 10 and len(content) > 100: | |
| items.append({ | |
| 'title': title, | |
| 'content': content, | |
| 'url': base_url, | |
| 'date': self._extract_date(soup) | |
| }) | |
| if not items: | |
| main_content = soup.select_one('main, .main-content, .content, article') | |
| if main_content: | |
| title = soup.select_one('h1, title') | |
| title_text = title.get_text(strip=True) if title else "خبر ایرنا" | |
| content_text = main_content.get_text(strip=True) | |
| if len(content_text) > 200: | |
| items.append({ | |
| 'title': title_text, | |
| 'content': content_text, | |
| 'url': base_url, | |
| 'date': self._extract_date(soup) | |
| }) | |
| except Exception as e: | |
| logger.error(f"IRNA extraction error: {e}") | |
| return items | |
| def _extract_tasnim_content(self, soup: BeautifulSoup, base_url: str, max_items: int) -> List[Dict]: | |
| """Extract content from Tasnim""" | |
| items = [] | |
| try: | |
| articles = soup.select('.news-box, .item, .story-item')[:max_items] | |
| for article in articles: | |
| title_elem = article.select_one('h2, h3, .title, a') | |
| if title_elem: | |
| title = title_elem.get_text(strip=True) | |
| content = article.get_text(strip=True) | |
| if len(title) > 10 and len(content) > 100: | |
| items.append({ | |
| 'title': title, | |
| 'content': content, | |
| 'url': base_url, | |
| 'date': self._extract_date(soup) | |
| }) | |
| if not items: | |
| main_content = soup.select_one('.news-content, .story-body, main') | |
| if main_content: | |
| title = soup.select_one('h1, .news-title') | |
| title_text = title.get_text(strip=True) if title else "خبر تسنیم" | |
| content_text = main_content.get_text(strip=True) | |
| if len(content_text) > 200: | |
| items.append({ | |
| 'title': title_text, | |
| 'content': content_text, | |
| 'url': base_url, | |
| 'date': self._extract_date(soup) | |
| }) | |
| except Exception as e: | |
| logger.error(f"Tasnim extraction error: {e}") | |
| return items | |
| def _extract_mehr_content(self, soup: BeautifulSoup, base_url: str, max_items: int) -> List[Dict]: | |
| """Extract content from Mehr News""" | |
| items = [] | |
| try: | |
| articles = soup.select('.news-item, .article-item, .story')[:max_items] | |
| for article in articles: | |
| title_elem = article.select_one('h2, h3, .title, .headline') | |
| if title_elem: | |
| title = title_elem.get_text(strip=True) | |
| content = article.get_text(strip=True) | |
| if len(title) > 10 and len(content) > 100: | |
| items.append({ | |
| 'title': title, | |
| 'content': content, | |
| 'url': base_url, | |
| 'date': self._extract_date(soup) | |
| }) | |
| if not items: | |
| main_content = soup.select_one('.content, .news-body, article') | |
| if main_content: | |
| title = soup.select_one('h1, .page-title') | |
| title_text = title.get_text(strip=True) if title else "خبر مهر" | |
| content_text = main_content.get_text(strip=True) | |
| if len(content_text) > 200: | |
| items.append({ | |
| 'title': title_text, | |
| 'content': content_text, | |
| 'url': base_url, | |
| 'date': self._extract_date(soup) | |
| }) | |
| except Exception as e: | |
| logger.error(f"Mehr extraction error: {e}") | |
| return items | |
| def _extract_fars_content(self, soup: BeautifulSoup, base_url: str, max_items: int) -> List[Dict]: | |
| """Extract content from Fars News""" | |
| items = [] | |
| try: | |
| articles = soup.select('.news, .item, .story-item')[:max_items] | |
| for article in articles: | |
| title_elem = article.select_one('h2, h3, .title, a') | |
| if title_elem: | |
| title = title_elem.get_text(strip=True) | |
| content = article.get_text(strip=True) | |
| if len(title) > 10 and len(content) > 100: | |
| items.append({ | |
| 'title': title, | |
| 'content': content, | |
| 'url': base_url, | |
| 'date': self._extract_date(soup) | |
| }) | |
| if not items: | |
| main_content = soup.select_one('.news-content, .story, main') | |
| if main_content: | |
| title = soup.select_one('h1, .news-title') | |
| title_text = title.get_text(strip=True) if title else "خبر فارس" | |
| content_text = main_content.get_text(strip=True) | |
| if len(content_text) > 200: | |
| items.append({ | |
| 'title': title_text, | |
| 'content': content_text, | |
| 'url': base_url, | |
| 'date': self._extract_date(soup) | |
| }) | |
| except Exception as e: | |
| logger.error(f"Fars extraction error: {e}") | |
| return items | |
| def _extract_generic_content(self, soup: BeautifulSoup, base_url: str, max_items: int) -> List[Dict]: | |
| """Generic content extraction for unknown sources""" | |
| items = [] | |
| try: | |
| articles = soup.select('article, .article, .post, .news-item, .story')[:max_items] | |
| for article in articles: | |
| title_elem = article.select_one('h1, h2, h3, .title, .headline') | |
| if title_elem: | |
| title = title_elem.get_text(strip=True) | |
| content = article.get_text(strip=True) | |
| if len(title) > 10 and len(content) > 150: | |
| items.append({ | |
| 'title': title, | |
| 'content': content, | |
| 'url': base_url, | |
| 'date': self._extract_date(soup) | |
| }) | |
| if not items: | |
| title_elem = soup.select_one('h1, title') | |
| content_elem = soup.select_one('main, .main-content, .content, .entry-content, body') | |
| if title_elem and content_elem: | |
| for unwanted in content_elem(['script', 'style', 'nav', 'header', 'footer']): | |
| unwanted.decompose() | |
| title = title_elem.get_text(strip=True) | |
| content = content_elem.get_text(strip=True) | |
| if len(title) > 5 and len(content) > 200: | |
| items.append({ | |
| 'title': title, | |
| 'content': content, | |
| 'url': base_url, | |
| 'date': self._extract_date(soup) | |
| }) | |
| except Exception as e: | |
| logger.error(f"Generic extraction error: {e}") | |
| return items | |
| def _extract_document_from_soup(self, soup: BeautifulSoup, url: str) -> Optional[LegalDocument]: | |
| """Extract main document from BeautifulSoup object using source-specific logic""" | |
| try: | |
| items = self._extract_source_specific_content(soup, url, 1) | |
| if not items: | |
| return None | |
| item = items[0] | |
| return LegalDocument( | |
| title=item['title'], | |
| content=item['content'], | |
| source_url=item['url'], | |
| document_type=self._determine_document_type(item['title'], item['content']), | |
| date_published=item['date'] | |
| ) | |
| except Exception as e: | |
| logger.error(f"Document extraction failed: {e}") | |
| return None | |
| def _extract_additional_articles(self, soup: BeautifulSoup, base_url: str) -> List[LegalDocument]: | |
| """Extract additional articles from the same page using source-specific logic""" | |
| documents = [] | |
| try: | |
| items = self._extract_source_specific_content(soup, base_url, 3) | |
| for item in items: | |
| doc = LegalDocument( | |
| title=item['title'], | |
| content=item['content'], | |
| source_url=item['url'], | |
| document_type=self._determine_document_type(item['title'], item['content']), | |
| date_published=item['date'] | |
| ) | |
| documents.append(doc) | |
| except Exception as e: | |
| logger.error(f"Additional articles extraction failed: {e}") | |
| return documents[:3] | |
| def _determine_document_type(self, title: str, content: str) -> str: | |
| """Determine document type based on content""" | |
| text = (title + " " + content).lower() | |
| if any(word in text for word in ['قانون', 'ماده', 'فصل', 'بند', 'تبصره']): | |
| return 'law' | |
| elif any(word in text for word in ['رای', 'حکم', 'دادگاه', 'قاضی']): | |
| return 'ruling' | |
| elif any(word in text for word in ['آییننامه', 'دستورالعمل', 'بخشنامه']): | |
| return 'regulation' | |
| elif any(word in text for word in ['خبر', 'اعلام', 'گزارش', 'اطلاعیه']): | |
| return 'news' | |
| else: | |
| return 'general' | |
| def _extract_date(self, soup: BeautifulSoup) -> Optional[str]: | |
| """Extract publication date""" | |
| try: | |
| date_selectors = [ | |
| 'meta[name="article:published_time"]', | |
| 'meta[property="article:published_time"]', | |
| 'meta[name="date"]', | |
| 'meta[name="DC.date"]', | |
| '.date', | |
| '.publish-date', | |
| '.article-date', | |
| 'time[datetime]' | |
| ] | |
| for selector in date_selectors: | |
| element = soup.select_one(selector) | |
| if element: | |
| date_str = element.get('content') or element.get('datetime') or element.get_text() | |
| if date_str: | |
| return self._normalize_date(date_str) | |
| text = soup.get_text() | |
| persian_date_patterns = [ | |
| r'(\d{4}/\d{1,2}/\d{1,2})', | |
| r'(\d{1,2}/\d{1,2}/\d{4})', | |
| r'(\d{4}-\d{1,2}-\d{1,2})' | |
| ] | |
| for pattern in persian_date_patterns: | |
| match = re.search(pattern, text) | |
| if match: | |
| return match.group(1) | |
| return None | |
| except Exception: | |
| return None | |
| def _normalize_date(self, date_str: str) -> Optional[str]: | |
| """Normalize date string to standard format""" | |
| try: | |
| date_str = re.sub(r'[^\d/\-:]', ' ', date_str).strip() | |
| formats = [ | |
| '%Y-%m-%d', | |
| '%Y/%m/%d', | |
| '%d/%m/%Y', | |
| '%Y-%m-%d %H:%M:%S', | |
| '%Y/%m/%d %H:%M:%S' | |
| ] | |
| for fmt in formats: | |
| try: | |
| parsed_date = datetime.strptime(date_str, fmt) | |
| return parsed_date.strftime('%Y-%m-%d') | |
| except ValueError: | |
| continue | |
| return date_str | |
| except Exception: | |
| return None |