Spaces:
Sleeping
Sleeping
| import os | |
| import ssl | |
| import smtplib | |
| from datetime import datetime | |
| from email.message import EmailMessage | |
| from typing import Optional, List, Union | |
| from pydantic import BaseModel, EmailStr, Field | |
| # === ENV SETTINGS === | |
| EMAIL_SENDER = os.getenv("EMAIL_SENDER", "[email protected]") | |
| EMAIL_PASSWORD = os.getenv("EMAIL_PASSWORD", "yourpassword") | |
| SMTP_SERVER = os.getenv("SMTP_SERVER", "smtp.gmail.com") | |
| SMTP_PORT = int(os.getenv("SMTP_PORT", 465)) | |
| USE_SSL = os.getenv("USE_SSL", "true").lower() in ("true", "1", "yes") | |
| EMAIL_TEST_MODE = os.getenv("EMAIL_TEST_MODE", "false").lower() in ("true", "1", "yes") | |
| # === Models === | |
| class EmailInput(BaseModel): | |
| to: Union[EmailStr, List[EmailStr]] = Field(..., description="Recipient(s) email address") | |
| subject: str = Field(..., description="Email subject line") | |
| body: str = Field(..., description="Plaintext or HTML body") | |
| html: Optional[bool] = Field(default=False, description="Send as HTML?") | |
| attachments: Optional[List[str]] = Field(default=None, description="List of file paths to attach") | |
| # === Email Body Generator === | |
| def generate_email(recipient_name: str, product_name: str, discount: float) -> str: | |
| return f"""\ | |
| Dear {recipient_name}, | |
| π Great news! We're offering you an exclusive {discount}% discount on {product_name}. | |
| Visit our site today and don't miss out! | |
| Best regards, | |
| Your AI Assistant | |
| """ | |
| # === Email Sending Core === | |
| def send_email(email_input: EmailInput) -> dict: | |
| msg = EmailMessage() | |
| msg["From"] = EMAIL_SENDER | |
| msg["To"] = ", ".join(email_input.to) if isinstance(email_input.to, list) else email_input.to | |
| msg["Subject"] = email_input.subject | |
| if email_input.html: | |
| msg.add_alternative(email_input.body, subtype="html") | |
| else: | |
| msg.set_content(email_input.body) | |
| # Add attachments | |
| if email_input.attachments: | |
| for path in email_input.attachments: | |
| if os.path.exists(path): | |
| with open(path, "rb") as f: | |
| data = f.read() | |
| filename = os.path.basename(path) | |
| msg.add_attachment(data, maintype="application", subtype="octet-stream", filename=filename) | |
| # Early exit in test mode | |
| if EMAIL_TEST_MODE: | |
| return { | |
| "status": "β TEST MODE β email not actually sent", | |
| "to": msg["To"], | |
| "subject": msg["Subject"], | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| # Send email via SMTP | |
| try: | |
| if USE_SSL: | |
| context = ssl.create_default_context() | |
| with smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT, context=context) as smtp: | |
| smtp.login(EMAIL_SENDER, EMAIL_PASSWORD) | |
| smtp.send_message(msg) | |
| else: | |
| with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as smtp: | |
| smtp.ehlo() | |
| smtp.starttls(context=ssl.create_default_context()) | |
| smtp.ehlo() | |
| smtp.login(EMAIL_SENDER, EMAIL_PASSWORD) | |
| smtp.send_message(msg) | |
| return { | |
| "status": "β Email sent successfully", | |
| "to": msg["To"], | |
| "subject": msg["Subject"], | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| except Exception as e: | |
| return { | |
| "status": f"β Failed to send email: {e}", | |
| "to": msg["To"], | |
| "subject": msg["Subject"], | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| # === Example usage === | |
| if __name__ == "__main__": | |
| email_body = generate_email("Alice", "Super Serum", 25) | |
| email_data = EmailInput( | |
| to=["[email protected]", "[email protected]"], | |
| subject="π Special Deal Just For You!", | |
| body=email_body, | |
| html=False, | |
| attachments=["/path/to/brochure.pdf"] | |
| ) | |
| result = send_email(email_data) | |
| print(result) | |