|
|
import requests |
|
|
from bs4 import BeautifulSoup |
|
|
import time |
|
|
import re |
|
|
from urllib.parse import urlparse, urljoin |
|
|
import sqlite3 |
|
|
|
|
|
|
|
|
try: |
|
|
from selenium import webdriver |
|
|
from selenium.webdriver.chrome.options import Options |
|
|
from selenium.webdriver.common.by import By |
|
|
from selenium.webdriver.support.ui import WebDriverWait |
|
|
from selenium.webdriver.support import expected_conditions as EC |
|
|
from webdriver_manager.chrome import ChromeDriverManager |
|
|
SELENIUM_AVAILABLE = True |
|
|
except ImportError: |
|
|
SELENIUM_AVAILABLE = False |
|
|
print("⚠️ Selenium not available. Company research will use basic scraping only.") |
|
|
|
|
|
class LinkedInScraper: |
|
|
def __init__(self, timeout=10, use_selenium=False): |
|
|
self.timeout = timeout |
|
|
self.use_selenium = use_selenium |
|
|
self.session = requests.Session() |
|
|
self.session.headers.update({ |
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', |
|
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', |
|
|
'Accept-Language': 'en-US,en;q=0.5', |
|
|
'Accept-Encoding': 'gzip, deflate', |
|
|
'Connection': 'keep-alive', |
|
|
'Upgrade-Insecure-Requests': '1', |
|
|
}) |
|
|
|
|
|
if self.use_selenium and SELENIUM_AVAILABLE: |
|
|
self._setup_selenium() |
|
|
elif self.use_selenium and not SELENIUM_AVAILABLE: |
|
|
print("⚠️ Selenium requested but not available. Falling back to basic scraping.") |
|
|
|
|
|
def _setup_selenium(self): |
|
|
"""Setup Selenium WebDriver""" |
|
|
if not SELENIUM_AVAILABLE: |
|
|
print("⚠️ Selenium not available. Cannot setup WebDriver.") |
|
|
return |
|
|
|
|
|
try: |
|
|
chrome_options = Options() |
|
|
chrome_options.add_argument('--headless') |
|
|
chrome_options.add_argument('--no-sandbox') |
|
|
chrome_options.add_argument('--disable-dev-shm-usage') |
|
|
chrome_options.add_argument('--disable-gpu') |
|
|
chrome_options.add_argument('--window-size=1920,1080') |
|
|
|
|
|
self.driver = webdriver.Chrome( |
|
|
ChromeDriverManager().install(), |
|
|
options=chrome_options |
|
|
) |
|
|
except Exception as e: |
|
|
print(f"Error setting up Selenium: {e}") |
|
|
self.use_selenium = False |
|
|
|
|
|
def _get_cached_data(self, url): |
|
|
"""Check if URL data is cached in database""" |
|
|
try: |
|
|
conn = sqlite3.connect('leads.db') |
|
|
cursor = conn.cursor() |
|
|
|
|
|
cursor.execute(''' |
|
|
CREATE TABLE IF NOT EXISTS scraped_cache ( |
|
|
url TEXT PRIMARY KEY, |
|
|
content TEXT, |
|
|
scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP |
|
|
) |
|
|
''') |
|
|
|
|
|
cursor.execute('SELECT content FROM scraped_cache WHERE url = ?', (url,)) |
|
|
result = cursor.fetchone() |
|
|
conn.close() |
|
|
|
|
|
return result[0] if result else None |
|
|
except Exception as e: |
|
|
print(f"Cache error: {e}") |
|
|
return None |
|
|
|
|
|
def _cache_data(self, url, content): |
|
|
"""Cache scraped data""" |
|
|
try: |
|
|
conn = sqlite3.connect('leads.db') |
|
|
cursor = conn.cursor() |
|
|
|
|
|
cursor.execute(''' |
|
|
INSERT OR REPLACE INTO scraped_cache (url, content) |
|
|
VALUES (?, ?) |
|
|
''', (url, content)) |
|
|
|
|
|
conn.commit() |
|
|
conn.close() |
|
|
except Exception as e: |
|
|
print(f"Cache save error: {e}") |
|
|
|
|
|
def scrape_with_requests(self, url): |
|
|
"""Scrape URL using requests and BeautifulSoup""" |
|
|
try: |
|
|
response = self.session.get(url, timeout=self.timeout) |
|
|
response.raise_for_status() |
|
|
|
|
|
soup = BeautifulSoup(response.content, 'html.parser') |
|
|
|
|
|
|
|
|
content_parts = [] |
|
|
|
|
|
|
|
|
meta_desc = soup.find('meta', attrs={'name': 'description'}) |
|
|
if meta_desc: |
|
|
content_parts.append(f"Description: {meta_desc.get('content', '')}") |
|
|
|
|
|
|
|
|
title = soup.find('title') |
|
|
if title: |
|
|
content_parts.append(f"Title: {title.get_text().strip()}") |
|
|
|
|
|
|
|
|
about_selectors = [ |
|
|
'.about-section', |
|
|
'.company-description', |
|
|
'.about-us', |
|
|
'[class*="about"]', |
|
|
'.description', |
|
|
'.summary', |
|
|
'main', |
|
|
'.content' |
|
|
] |
|
|
|
|
|
for selector in about_selectors: |
|
|
elements = soup.select(selector) |
|
|
for element in elements: |
|
|
text = element.get_text().strip() |
|
|
if len(text) > 50: |
|
|
content_parts.append(text[:500]) |
|
|
break |
|
|
if content_parts: |
|
|
break |
|
|
|
|
|
|
|
|
if not content_parts: |
|
|
paragraphs = soup.find_all('p') |
|
|
for p in paragraphs[:3]: |
|
|
text = p.get_text().strip() |
|
|
if len(text) > 30: |
|
|
content_parts.append(text[:300]) |
|
|
|
|
|
return ' | '.join(content_parts) if content_parts else "No content extracted" |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error scraping {url}: {str(e)}" |
|
|
|
|
|
def scrape_with_selenium(self, url): |
|
|
"""Scrape URL using Selenium""" |
|
|
try: |
|
|
self.driver.get(url) |
|
|
WebDriverWait(self.driver, self.timeout).until( |
|
|
EC.presence_of_element_located((By.TAG_NAME, "body")) |
|
|
) |
|
|
|
|
|
|
|
|
time.sleep(2) |
|
|
|
|
|
content_parts = [] |
|
|
|
|
|
|
|
|
linkedin_selectors = [ |
|
|
'[data-test-id="about-us-description"]', |
|
|
'.company-about-us-description', |
|
|
'.about-section', |
|
|
'[class*="about"]' |
|
|
] |
|
|
|
|
|
for selector in linkedin_selectors: |
|
|
try: |
|
|
elements = self.driver.find_elements(By.CSS_SELECTOR, selector) |
|
|
for element in elements: |
|
|
text = element.text.strip() |
|
|
if len(text) > 50: |
|
|
content_parts.append(text[:500]) |
|
|
break |
|
|
except: |
|
|
continue |
|
|
|
|
|
|
|
|
if not content_parts: |
|
|
general_selectors = ['main', '.content', 'article', '.description'] |
|
|
for selector in general_selectors: |
|
|
try: |
|
|
elements = self.driver.find_elements(By.CSS_SELECTOR, selector) |
|
|
for element in elements: |
|
|
text = element.text.strip() |
|
|
if len(text) > 50: |
|
|
content_parts.append(text[:500]) |
|
|
break |
|
|
except: |
|
|
continue |
|
|
|
|
|
return ' | '.join(content_parts) if content_parts else "No content extracted" |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error scraping {url} with Selenium: {str(e)}" |
|
|
|
|
|
def scrape_linkedin_profile(self, linkedin_url): |
|
|
"""Scrape LinkedIn company profile""" |
|
|
if not linkedin_url or not linkedin_url.strip(): |
|
|
return "No LinkedIn URL provided" |
|
|
|
|
|
|
|
|
cached_content = self._get_cached_data(linkedin_url) |
|
|
if cached_content: |
|
|
return cached_content |
|
|
|
|
|
try: |
|
|
|
|
|
linkedin_url = linkedin_url.strip() |
|
|
if not linkedin_url.startswith('http'): |
|
|
linkedin_url = 'https://' + linkedin_url |
|
|
|
|
|
|
|
|
if self.use_selenium: |
|
|
content = self.scrape_with_selenium(linkedin_url) |
|
|
else: |
|
|
content = self.scrape_with_requests(linkedin_url) |
|
|
|
|
|
|
|
|
self._cache_data(linkedin_url, content) |
|
|
|
|
|
return content |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error accessing LinkedIn: {str(e)}" |
|
|
|
|
|
def scrape_linkedin_company(self, linkedin_url): |
|
|
"""Alias for scrape_linkedin_profile - for compatibility""" |
|
|
return self.scrape_linkedin_profile(linkedin_url) |
|
|
|
|
|
def scrape_company_data(self, linkedin_url): |
|
|
"""Another alias for compatibility""" |
|
|
return self.scrape_linkedin_profile(linkedin_url) |
|
|
|
|
|
def scrape_company_website(self, company_name): |
|
|
"""Scrape company website as fallback""" |
|
|
try: |
|
|
|
|
|
company_clean = re.sub(r'[^\w\s-]', '', company_name.lower()) |
|
|
company_clean = re.sub(r'\s+', '', company_clean) |
|
|
|
|
|
possible_urls = [ |
|
|
f"https://{company_clean}.com", |
|
|
f"https://www.{company_clean}.com", |
|
|
f"https://{company_clean}.org", |
|
|
f"https://www.{company_clean}.org" |
|
|
] |
|
|
|
|
|
for url in possible_urls: |
|
|
cached_content = self._get_cached_data(url) |
|
|
if cached_content: |
|
|
return cached_content |
|
|
|
|
|
try: |
|
|
if self.use_selenium: |
|
|
content = self.scrape_with_selenium(url) |
|
|
else: |
|
|
content = self.scrape_with_requests(url) |
|
|
|
|
|
if "Error" not in content and len(content) > 50: |
|
|
self._cache_data(url, content) |
|
|
return content |
|
|
except: |
|
|
continue |
|
|
|
|
|
return f"Could not find website for {company_name}" |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error finding company website: {str(e)}" |
|
|
|
|
|
def scrape_linkedin_or_company(self, linkedin_url, company_name): |
|
|
"""Main method to scrape LinkedIn or fallback to company website""" |
|
|
|
|
|
if linkedin_url and linkedin_url.strip(): |
|
|
linkedin_content = self.scrape_linkedin_profile(linkedin_url) |
|
|
if "Error" not in linkedin_content and len(linkedin_content) > 50: |
|
|
return f"LinkedIn: {linkedin_content}" |
|
|
|
|
|
|
|
|
company_content = self.scrape_company_website(company_name) |
|
|
return f"Company Website: {company_content}" |
|
|
|
|
|
def __del__(self): |
|
|
"""Clean up Selenium driver""" |
|
|
if hasattr(self, 'driver'): |
|
|
try: |
|
|
self.driver.quit() |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
def scrape_company_info(input_data): |
|
|
""" |
|
|
Scrape company information from LinkedIn URL or company name |
|
|
|
|
|
Args: |
|
|
input_data (str): LinkedIn URL or company name |
|
|
|
|
|
Returns: |
|
|
str: Scraped company information or error message if dependencies missing |
|
|
""" |
|
|
if not SELENIUM_AVAILABLE: |
|
|
return "Company research feature requires additional setup. Please install selenium and webdriver-manager for enterprise features." |
|
|
|
|
|
try: |
|
|
scraper = LinkedInScraper() |
|
|
|
|
|
|
|
|
if 'linkedin.com' in input_data.lower(): |
|
|
result = scraper.scrape_linkedin_or_company(input_data, "") |
|
|
else: |
|
|
|
|
|
result = scraper.scrape_company_website(input_data) |
|
|
|
|
|
return result if result else "" |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error in scrape_company_info: {e}") |
|
|
return "" |
|
|
|