Spaces:
Build error
Build error
| import gradio as gr | |
| import pandas as pd | |
| from simple_salesforce import Salesforce | |
| from datetime import datetime | |
| import logging | |
| import json | |
| from faker import Faker | |
| import random | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Global connection and state | |
| sf_connection = None | |
| available_objects = [] | |
| object_schemas = {} | |
| fake = Faker() | |
| def get_salesforce_objects(): | |
| """Get list of available Salesforce objects""" | |
| global sf_connection, available_objects | |
| if not sf_connection: | |
| return [] | |
| try: | |
| # Get commonly used objects and test their accessibility | |
| common_objects = [ | |
| 'Account', 'Contact', 'Lead', 'Opportunity', 'Case', | |
| 'Campaign', 'User', 'Product2', 'Task', 'Event' | |
| ] | |
| available_objects = [] | |
| for obj_name in common_objects: | |
| try: | |
| obj = getattr(sf_connection, obj_name) | |
| obj.describe() | |
| available_objects.append(obj_name) | |
| except: | |
| continue | |
| return available_objects | |
| except Exception as e: | |
| logger.error(f"Error getting objects: {str(e)}") | |
| return ['Account', 'Contact', 'Lead'] | |
| def get_object_schema(object_name): | |
| """Get schema for a specific Salesforce object""" | |
| global sf_connection, object_schemas | |
| if not sf_connection or not object_name: | |
| return {} | |
| try: | |
| if object_name not in object_schemas: | |
| obj = getattr(sf_connection, object_name) | |
| metadata = obj.describe() | |
| schema = { | |
| 'name': object_name, | |
| 'label': metadata.get('label', object_name), | |
| 'fields': [] | |
| } | |
| for field in metadata['fields']: | |
| if field['createable'] or field['updateable']: | |
| field_info = { | |
| 'name': field['name'], | |
| 'label': field['label'], | |
| 'type': field['type'], | |
| 'required': not field['nillable'] and not field['defaultedOnCreate'], | |
| 'length': field.get('length', 0), | |
| 'picklistValues': [pv['value'] for pv in field.get('picklistValues', [])] | |
| } | |
| schema['fields'].append(field_info) | |
| object_schemas[object_name] = schema | |
| return object_schemas[object_name] | |
| except Exception as e: | |
| logger.error(f"Error getting schema for {object_name}: {str(e)}") | |
| return {} | |
| def generate_test_data(object_name, fields, num_records=100): | |
| """Generate test data using Faker for specified object and fields""" | |
| try: | |
| schema = get_object_schema(object_name) | |
| if not schema: | |
| return None, "β Could not get object schema" | |
| records = [] | |
| for _ in range(num_records): | |
| record = {} | |
| for field_name in fields: | |
| field_info = next((f for f in schema['fields'] if f['name'] == field_name), None) | |
| if not field_info: | |
| continue | |
| field_type = field_info['type'] | |
| # Generate data based on field type and name | |
| if field_name.lower() in ['firstname', 'first_name']: | |
| record[field_name] = fake.first_name() | |
| elif field_name.lower() in ['lastname', 'last_name']: | |
| record[field_name] = fake.last_name() | |
| elif field_name.lower() in ['name'] and object_name == 'Account': | |
| record[field_name] = fake.company() | |
| elif field_name.lower() in ['email']: | |
| record[field_name] = fake.email() | |
| elif field_name.lower() in ['phone']: | |
| record[field_name] = fake.phone_number() | |
| elif field_name.lower() in ['website']: | |
| record[field_name] = fake.url() | |
| elif field_name.lower() in ['street', 'mailingstreet', 'billingstreet']: | |
| record[field_name] = fake.street_address() | |
| elif field_name.lower() in ['city', 'mailingcity', 'billingcity']: | |
| record[field_name] = fake.city() | |
| elif field_name.lower() in ['state', 'mailingstate', 'billingstate']: | |
| record[field_name] = fake.state_abbr() | |
| elif field_name.lower() in ['postalcode', 'mailingpostalcode', 'billingpostalcode']: | |
| record[field_name] = fake.zipcode() | |
| elif field_name.lower() in ['country', 'mailingcountry', 'billingcountry']: | |
| record[field_name] = 'US' | |
| elif field_type == 'picklist' and field_info['picklistValues']: | |
| record[field_name] = random.choice(field_info['picklistValues']) | |
| elif field_type == 'boolean': | |
| record[field_name] = random.choice([True, False]) | |
| elif field_type in ['int', 'double', 'currency']: | |
| if 'annual' in field_name.lower() or 'revenue' in field_name.lower(): | |
| record[field_name] = random.randint(100000, 10000000) | |
| else: | |
| record[field_name] = random.randint(1, 1000) | |
| elif field_type == 'date': | |
| record[field_name] = fake.date_between(start_date='-1y', end_date='today').isoformat() | |
| elif field_type == 'datetime': | |
| record[field_name] = fake.date_time_between(start_date='-1y', end_date='now').isoformat() | |
| elif field_type in ['string', 'textarea']: | |
| if field_info['length'] > 100: | |
| record[field_name] = fake.text(max_nb_chars=min(field_info['length'], 200)) | |
| else: | |
| record[field_name] = fake.sentence(nb_words=3)[:field_info['length']] | |
| else: | |
| # Default string value | |
| record[field_name] = f"Test {field_name}" | |
| records.append(record) | |
| # Create DataFrame | |
| df = pd.DataFrame(records) | |
| # Save to CSV | |
| filename = f"test_data_{object_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" | |
| df.to_csv(filename, index=False) | |
| return filename, f"β Generated {num_records} test records for {object_name}\nFields: {', '.join(fields)}\nFile: {filename}" | |
| except Exception as e: | |
| logger.error(f"Error generating test data: {str(e)}") | |
| return None, f"β Error generating test data: {str(e)}" | |
| def enhanced_salesforce_operation(username, password, security_token, sandbox, operation, | |
| object_name, selected_fields, csv_file, num_records): | |
| """Enhanced Salesforce operations with full functionality""" | |
| global sf_connection | |
| # Step 1: Connect to Salesforce | |
| if not username or not password or not security_token: | |
| return "β Please provide username, password, and security token", None, "[]", "[]" | |
| try: | |
| domain = 'test' if sandbox else None | |
| sf_connection = Salesforce( | |
| username=username, | |
| password=password, | |
| security_token=security_token, | |
| domain=domain | |
| ) | |
| connection_msg = f"β Connected to Salesforce as {username}\n" | |
| # Get available objects | |
| objects = get_salesforce_objects() | |
| objects_json = json.dumps(objects) | |
| # Step 2: Handle different operations | |
| if operation == "connect_only": | |
| return connection_msg + f"Available objects: {', '.join(objects)}", None, objects_json, "[]" | |
| elif operation == "get_schema": | |
| if not object_name: | |
| return connection_msg + "β Please select an object", None, objects_json, "[]" | |
| schema = get_object_schema(object_name) | |
| fields = [f"{f['name']} ({f['type']})" for f in schema.get('fields', [])] | |
| fields_json = json.dumps([f['name'] for f in schema.get('fields', [])]) | |
| return (connection_msg + f"π Schema for {object_name}:\n" + | |
| f"Fields: {len(fields)}\n" + "\n".join(fields[:20]) + | |
| (f"\n... and {len(fields)-20} more fields" if len(fields) > 20 else "")), None, objects_json, fields_json | |
| elif operation == "generate_data": | |
| if not object_name or not selected_fields: | |
| return connection_msg + "β Please select object and fields", None, objects_json, "[]" | |
| fields_list = selected_fields.split(',') if isinstance(selected_fields, str) else selected_fields | |
| filename, result = generate_test_data(object_name, fields_list, num_records) | |
| return connection_msg + result, filename, objects_json, "[]" | |
| elif operation == "import_data": | |
| if not csv_file: | |
| return connection_msg + "β Please upload a CSV file", None, objects_json, "[]" | |
| if not object_name: | |
| return connection_msg + "β Please select target object", None, objects_json, "[]" | |
| # Read and process file | |
| try: | |
| if csv_file.name.endswith('.csv'): | |
| df = pd.read_csv(csv_file.name) | |
| elif csv_file.name.endswith(('.xlsx', '.xls')): | |
| df = pd.read_excel(csv_file.name) | |
| else: | |
| return connection_msg + "β Please upload a CSV or Excel file", None, objects_json, "[]" | |
| if df.empty: | |
| return connection_msg + "β The uploaded file is empty", None, objects_json, "[]" | |
| # Clean data | |
| records = df.to_dict('records') | |
| cleaned_records = [] | |
| for record in records: | |
| cleaned_record = {k: v for k, v in record.items() if pd.notna(v)} | |
| cleaned_records.append(cleaned_record) | |
| # Import using bulk API | |
| result = sf_connection.bulk.__getattr__(object_name).insert(cleaned_records) | |
| # Process results | |
| success_count = sum(1 for r in result if r.get('success')) | |
| error_count = len(result) - success_count | |
| import_msg = f"\nπ€ Import Results:\n" | |
| import_msg += f"Object: {object_name}\n" | |
| import_msg += f"Total records: {len(records)}\n" | |
| import_msg += f"β Successful: {success_count}\n" | |
| import_msg += f"β Failed: {error_count}\n" | |
| if error_count > 0: | |
| errors = [r.get('errors', []) for r in result if not r.get('success')] | |
| import_msg += f"\nFirst few errors: {str(errors[:3])}" | |
| return connection_msg + import_msg, None, objects_json, "[]" | |
| except Exception as e: | |
| return connection_msg + f"β Import error: {str(e)}", None, objects_json, "[]" | |
| elif operation == "export_data": | |
| if not object_name: | |
| return connection_msg + "β Please select an object", None, objects_json, "[]" | |
| try: | |
| schema = get_object_schema(object_name) | |
| # Use selected fields or default fields | |
| if selected_fields: | |
| fields_list = selected_fields.split(',') if isinstance(selected_fields, str) else selected_fields | |
| fields_list = [f.strip() for f in fields_list] | |
| else: | |
| # Use first 10 fields as default | |
| fields_list = [f['name'] for f in schema.get('fields', [])[:10]] | |
| fields_str = ', '.join(fields_list) | |
| query = f"SELECT {fields_str} FROM {object_name} LIMIT 100" | |
| result = sf_connection.query_all(query) | |
| records = result['records'] | |
| if records: | |
| df = pd.DataFrame(records) | |
| if 'attributes' in df.columns: | |
| df = df.drop('attributes', axis=1) | |
| # Save export file | |
| export_file = f"export_{object_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" | |
| df.to_csv(export_file, index=False) | |
| export_msg = f"\nπ₯ Export Results:\n" | |
| export_msg += f"Object: {object_name}\n" | |
| export_msg += f"Records exported: {len(records)}\n" | |
| export_msg += f"Fields: {', '.join(df.columns)}\n" | |
| export_msg += f"Sample data:\n{df.head(3).to_string()}" | |
| return connection_msg + export_msg, export_file, objects_json, "[]" | |
| else: | |
| return connection_msg + f"\nβ No {object_name} records found", None, objects_json, "[]" | |
| except Exception as e: | |
| return connection_msg + f"\nβ Export error: {str(e)}", None, objects_json, "[]" | |
| else: | |
| return connection_msg + "β Invalid operation", None, objects_json, "[]" | |
| except Exception as e: | |
| error_msg = str(e) | |
| if "INVALID_LOGIN" in error_msg: | |
| return "β Invalid credentials. Please check your username, password, and security token.", None, "[]", "[]" | |
| elif "API_DISABLED_FOR_ORG" in error_msg: | |
| return "β API access is disabled. Contact your Salesforce admin.", None, "[]", "[]" | |
| elif "LOGIN_MUST_USE_SECURITY_TOKEN" in error_msg: | |
| return "β Security token required. Append it to your password.", None, "[]", "[]" | |
| else: | |
| return f"β Connection failed: {error_msg}", None, "[]", "[]" | |
| # Create the enhanced interface | |
| demo = gr.Interface( | |
| fn=enhanced_salesforce_operation, | |
| inputs=[ | |
| gr.Textbox(label="Username", placeholder="[email protected]"), | |
| gr.Textbox(label="Password", type="password"), | |
| gr.Textbox(label="Security Token", type="password"), | |
| gr.Checkbox(label="Sandbox Environment"), | |
| gr.Dropdown( | |
| label="Operation", | |
| choices=[ | |
| "connect_only", | |
| "get_schema", | |
| "generate_data", | |
| "import_data", | |
| "export_data" | |
| ], | |
| value="connect_only" | |
| ), | |
| gr.Dropdown(label="Salesforce Object", choices=[], allow_custom_value=True), | |
| gr.Textbox(label="Selected Fields (comma-separated)", placeholder="Name,Email,Phone"), | |
| gr.File(label="CSV/Excel File (for import)", file_types=[".csv", ".xlsx", ".xls"]), | |
| gr.Slider(label="Number of Test Records", minimum=10, maximum=1000, value=100, step=10) | |
| ], | |
| outputs=[ | |
| gr.Textbox(label="Results", lines=15), | |
| gr.File(label="Download File"), | |
| gr.Textbox(label="Available Objects (JSON)", visible=False), | |
| gr.Textbox(label="Available Fields (JSON)", visible=False) | |
| ], | |
| title="π Enhanced Salesforce Data Loader", | |
| description=""" | |
| **Advanced Salesforce Data Management Tool** | |
| **Workflow:** | |
| 1. **Connect**: Enter credentials, select 'connect_only' to see available objects | |
| 2. **Get Schema**: Select object, choose 'get_schema' to see fields | |
| 3. **Generate Data**: Select fields, choose 'generate_data' to create test data with Faker | |
| 4. **Import**: Upload CSV/Excel, select target object, choose 'import_data' | |
| 5. **Export**: Select object and fields, choose 'export_data' | |
| **Features:** | |
| - β Live object detection from your Salesforce org | |
| - β Dynamic schema reading with field types | |
| - β Intelligent test data generation using Faker | |
| - β Field mapping and validation | |
| - β Bulk operations for performance | |
| - β Relationship data support (Account + Contact) | |
| """, | |
| examples=[ | |
| ["[email protected]", "password123", "token123", False, "connect_only", "", "", None, 100], | |
| ] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=7860) |