import streamlit as st import pandas as pd import sqlite3 import os from datetime import datetime import time from scraper import LinkedInScraper from email_gen import EmailGenerator # Configure Streamlit page st.set_page_config( page_title="Cold Email Outreach Assistant", page_icon="📧", layout="wide" ) # Initialize session state if 'processed_data' not in st.session_state: st.session_state.processed_data = None if 'email_generator' not in st.session_state: st.session_state.email_generator = None def init_database(): """Initialize SQLite database for caching""" conn = sqlite3.connect('leads.db') cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS scraped_data ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT, email TEXT, company TEXT, linkedin_url TEXT, scraped_info TEXT, generated_subject TEXT, generated_email TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') conn.commit() conn.close() def save_to_database(data): """Save processed data to database""" conn = sqlite3.connect('leads.db') cursor = conn.cursor() for _, row in data.iterrows(): cursor.execute(''' INSERT OR REPLACE INTO scraped_data (name, email, company, linkedin_url, scraped_info, generated_subject, generated_email) VALUES (?, ?, ?, ?, ?, ?, ?) ''', ( row['name'], row['email'], row['company'], row['linkedin_url'], row.get('scraped_info', ''), row.get('generated_subject', ''), row.get('generated_email', '') )) conn.commit() conn.close() def load_from_database(): """Load data from database""" conn = sqlite3.connect('leads.db') df = pd.read_sql_query('SELECT * FROM scraped_data ORDER BY created_at DESC', conn) conn.close() return df def main(): st.title("📧 Cold Email Outreach Assistant") st.markdown("Upload your leads CSV and generate personalized cold emails using AI") # Initialize database init_database() # Sidebar for configuration with st.sidebar: st.header("⚙️ Configuration") # Model configuration st.subheader("AI Model Settings") model_option = st.selectbox( "Model Type", ["Download Vicuna-7B (Recommended)", "Use Custom Model Path"] ) if model_option == "Use Custom Model Path": custom_model_path = st.text_input("Custom Model Path", "") else: custom_model_path = None # Email generation settings st.subheader("📧 Email Generation") tone = st.selectbox( "Email Tone", ["Professional", "Friendly", "Direct", "Authoritative"], index=0, help="Choose the tone for generated emails" ) temperature = st.slider( "Creativity Level", min_value=0.3, max_value=1.0, value=0.7, step=0.1, help="Lower = more conservative, Higher = more creative" ) generate_variations = st.checkbox( "Generate Multiple Variations", value=False, help="Generate 3 different email variations per lead" ) # Scraping configuration st.subheader("🔍 Scraping Settings") scrape_timeout = st.slider("Scrape Timeout (seconds)", 5, 30, 10) use_selenium = st.checkbox("Use Selenium (slower but more reliable)", value=False) # Main content area tab1, tab2, tab3 = st.tabs(["📤 Upload & Process", "📊 Results", "📈 History"]) with tab1: st.header("Upload Your Leads CSV") # File upload uploaded_file = st.file_uploader( "Choose a CSV file", type="csv", help="CSV should contain columns: name, email, company, linkedin_url" ) if uploaded_file is not None: try: # Read CSV df = pd.read_csv(uploaded_file) # Validate columns required_columns = ['name', 'email', 'company', 'linkedin_url'] missing_columns = [col for col in required_columns if col not in df.columns] if missing_columns: st.error(f"Missing required columns: {', '.join(missing_columns)}") st.info("Required columns: name, email, company, linkedin_url") else: st.success(f"✅ CSV loaded successfully! Found {len(df)} leads") st.dataframe(df.head()) # Process data button if st.button("🚀 Start Processing", type="primary"): process_leads(df, scrape_timeout, use_selenium, custom_model_path, tone, temperature, generate_variations) except Exception as e: st.error(f"Error reading CSV: {str(e)}") with tab2: st.header("Processing Results") if st.session_state.processed_data is not None: df = st.session_state.processed_data # Display results st.success(f"✅ Processed {len(df)} leads successfully!") # Show detailed results for idx, row in df.iterrows(): with st.expander(f"📋 {row['name']} - {row['company']} {'🎯' if row.get('tone_used') else ''}"): col1, col2, col3 = st.columns([2, 3, 1]) with col1: st.subheader("📊 Scraped Information") st.text_area("Company Info", row.get('scraped_info', 'No info scraped'), height=100, key=f"info_{idx}") # Show generation settings if available if row.get('tone_used'): st.write(f"**Tone:** {row.get('tone_used', 'N/A')}") st.write(f"**Temperature:** {row.get('temperature_used', 'N/A')}") with col2: st.subheader("📧 Generated Email") subject = row.get('generated_subject', 'No subject generated') email_body = row.get('generated_email', 'No email generated') st.text_area("Subject", subject, height=50, key=f"subject_{idx}") st.text_area("Email Body", email_body, height=250, key=f"email_{idx}") with col3: st.subheader("📈 Quality") if subject and email_body: subject_len = len(subject) # Get main body without variations main_body = email_body.split('--- VARIATIONS ---')[0].strip() body_words = len(main_body.split()) # Quality indicators if 15 <= subject_len <= 65: st.success(f"✅ Subject: {subject_len} chars") else: st.warning(f"⚠️ Subject: {subject_len} chars") if 25 <= body_words <= 100: st.success(f"✅ Body: {body_words} words") else: st.warning(f"⚠️ Body: {body_words} words") # Check for placeholders if '[Your Name]' in email_body or '{' in email_body: st.error("❌ Contains placeholders") else: st.success("✅ No placeholders") # Check for personalization if row['name'] in main_body and row['company'] in main_body: st.success("✅ Well personalized") else: st.warning("⚠️ Low personalization") # Check for CTA cta_words = ['call', 'conversation', 'chat', 'discuss', 'talk', 'meeting'] if any(word in main_body.lower() for word in cta_words): st.success("✅ Has call-to-action") else: st.warning("⚠️ Weak call-to-action") # Overall quality score quality_score = 0 if 15 <= subject_len <= 65: quality_score += 20 if 25 <= body_words <= 100: quality_score += 25 if '[Your Name]' not in email_body: quality_score += 25 if row['name'] in main_body and row['company'] in main_body: quality_score += 20 if any(word in main_body.lower() for word in cta_words): quality_score += 10 if quality_score >= 80: st.success(f"🏆 Overall: {quality_score}% - Ready to send!") elif quality_score >= 60: st.warning(f"📝 Overall: {quality_score}% - Needs polish") else: st.error(f"🔧 Overall: {quality_score}% - Needs work") # Quick copy button email_text = f"Subject: {subject}\n\n{email_body}" st.text_area("Copy Email", email_text, height=100, key=f"copy_{idx}") # Export functionality if st.button("📥 Export to CSV"): csv_data = df.to_csv(index=False) st.download_button( label="⬇️ Download CSV", data=csv_data, file_name=f"cold_emails_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv", mime="text/csv" ) else: st.info("👆 Upload and process a CSV file to see results here") with tab3: st.header("Processing History") # Load and display historical data try: history_df = load_from_database() if not history_df.empty: st.dataframe(history_df) # Export history if st.button("📥 Export History"): csv_data = history_df.to_csv(index=False) st.download_button( label="⬇️ Download History CSV", data=csv_data, file_name=f"email_history_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv", mime="text/csv" ) else: st.info("No historical data found") except Exception as e: st.error(f"Error loading history: {str(e)}") def process_leads(df, scrape_timeout, use_selenium, custom_model_path, tone, temperature, generate_variations): """Process the uploaded leads with enhanced email generation""" progress_bar = st.progress(0) status_text = st.empty() try: # Initialize components status_text.text("🔧 Initializing scraper...") scraper = LinkedInScraper(timeout=scrape_timeout, use_selenium=use_selenium) status_text.text("🤖 Initializing AI model...") if st.session_state.email_generator is None: st.session_state.email_generator = EmailGenerator(custom_model_path) email_gen = st.session_state.email_generator # Process each lead processed_data = [] total_leads = len(df) for idx, row in df.iterrows(): status_text.text(f"🔍 Processing {row['name']} ({idx + 1}/{total_leads})") # Scrape information scraped_info = scraper.scrape_linkedin_or_company( row['linkedin_url'], row['company'] ) # Generate email with new parameters status_text.text(f"✍️ Generating email for {row['name']} ({tone} tone)...") if generate_variations: # Generate multiple variations variations = email_gen.generate_multiple_variations( row['name'], row['company'], scraped_info, num_variations=3, tone=tone ) # Use the first variation as primary subject = variations[0]['subject'] email_body = variations[0]['email_body'] # Store all variations in a formatted way variations_text = "\n\n--- VARIATIONS ---\n" for i, var in enumerate(variations, 1): variations_text += f"\nVariation {i} ({var['tone']}):\n" variations_text += f"Subject: {var['subject']}\n" variations_text += f"Body: {var['email_body']}\n" email_body += variations_text else: # Generate single email with specified parameters subject, email_body = email_gen.generate_email( row['name'], row['company'], scraped_info, tone=tone, temperature=temperature ) # Add to processed data processed_data.append({ 'name': row['name'], 'email': row['email'], 'company': row['company'], 'linkedin_url': row['linkedin_url'], 'scraped_info': scraped_info, 'generated_subject': subject, 'generated_email': email_body, 'tone_used': tone, 'temperature_used': temperature }) # Update progress progress_bar.progress((idx + 1) / total_leads) # Convert to DataFrame and save result_df = pd.DataFrame(processed_data) st.session_state.processed_data = result_df # Save to database save_to_database(result_df) status_text.text("✅ Processing completed!") st.success("🎉 All leads processed successfully!") # Show quality metrics avg_subject_length = result_df['generated_subject'].str.len().mean() avg_body_length = result_df['generated_email'].str.split().str.len().mean() st.info(f"📊 Quality Metrics: Avg subject length: {avg_subject_length:.0f} chars, Avg body length: {avg_body_length:.0f} words") except Exception as e: st.error(f"❌ Error during processing: {str(e)}") status_text.text("❌ Processing failed") if __name__ == "__main__": main()