Spaces:
Sleeping
Sleeping
| import datetime | |
| import random | |
| import logging | |
| import contextlib | |
| import io | |
| from typing import Any, Callable, Dict, List, Tuple, Union, Type | |
| from pydantic import BaseModel, Field | |
| from playwright.sync_api import sync_playwright | |
| import duckduckgo_search | |
| # External modules (you must have these implemented) | |
| from app.email_tool import send_email, generate_email | |
| from app.repl_tool import run_python_code | |
| from app.translation_tool import translate | |
| from app.vision import describe_image | |
| from langchain.agents import Tool | |
| # === Logging Setup === | |
| logger = logging.getLogger(__name__) | |
| logging.basicConfig(level=logging.INFO) | |
| # === Structured Input Schemas === | |
| class ExecInput(BaseModel): | |
| code: str = Field(..., description="Python code to execute") | |
| class WebSearchInput(BaseModel): | |
| url: str = Field(..., description="URL to browse for content") | |
| # === Python Code Execution === | |
| def exec_py(inputs: ExecInput) -> str: | |
| """ | |
| Securely execute Python code and return output or error. | |
| """ | |
| output = io.StringIO() | |
| local_vars = {} | |
| try: | |
| with contextlib.redirect_stdout(output): | |
| exec(inputs.code, {}, local_vars) | |
| return output.getvalue().strip() or "✅ Code executed successfully." | |
| except Exception as e: | |
| logger.error(f"exec_py error: {e}") | |
| return f"❌ Error: {e}" | |
| # === Simple Tools === | |
| def calc(expression: str) -> str: | |
| """ | |
| Evaluate a simple math expression. | |
| """ | |
| try: | |
| result = eval(expression, {"__builtins__": None}, {}) | |
| return str(result) | |
| except Exception as e: | |
| logger.error(f"Calc error for '{expression}': {e}") | |
| return "❌ Invalid expression" | |
| def time_now(_: str = "") -> str: | |
| return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| def joke(_: str = "") -> str: | |
| return random.choice([ | |
| "Parallel lines have so much in common—they’ll never meet.", | |
| "Why don’t scientists trust atoms? Because they make up everything!", | |
| "I told my computer I needed a break, and it said no problem — it needed one too." | |
| ]) | |
| def browse(inputs: WebSearchInput) -> str: | |
| """ | |
| Fetch and return the text content of a web page. | |
| """ | |
| try: | |
| with sync_playwright() as pw: | |
| browser = pw.chromium.launch(headless=True) | |
| page = browser.new_page() | |
| page.goto(inputs.url, timeout=10000) | |
| text = page.text_content("body") or "" | |
| browser.close() | |
| return text[:500] if text else "❌ No content found." | |
| except Exception as e: | |
| logger.error(f"Browse error: {e}") | |
| return f"❌ Error browsing: {e}" | |
| def ddg_search(query: str) -> List[str]: | |
| try: | |
| results = duckduckgo_search.DuckDuckGoSearch().text(query) | |
| return results[:3] if results else ["No results"] | |
| except Exception as e: | |
| logger.error(f"DuckDuckGo search error: {e}") | |
| return ["❌ Search failed"] | |
| # === LangChain-Formatted Tool List === | |
| def get_tools() -> List[Tool]: | |
| return [ | |
| Tool( | |
| name="Code Interpreter", | |
| func=run_python_code, | |
| description="🧠 Executes Python code and returns results.", | |
| ), | |
| Tool( | |
| name="Email Generator", | |
| func=lambda prompt: generate_email("Customer", prompt, 15), | |
| description="📧 Generates a promotional or customer email.", | |
| ), | |
| Tool( | |
| name="Translate Text", | |
| func=translate, | |
| description="🌐 Translates input text to a selected language.", | |
| ), | |
| Tool( | |
| name="Describe Image", | |
| func=describe_image, | |
| description="🖼️ Describes the contents of an image.", | |
| ), | |
| Tool( | |
| name="Browse Website", | |
| func=browse, | |
| description="🔎 Browse and fetch page content.", | |
| args_schema=WebSearchInput, | |
| ), | |
| Tool( | |
| name="Python Executor", | |
| func=exec_py, | |
| description="🐍 Execute Python code and return output.", | |
| args_schema=ExecInput, | |
| ), | |
| ] | |
| # === TOOL MAPPING === | |
| TOOLS: Dict[str, Union[Callable[..., Any], Tuple[Type[BaseModel], Callable[..., Any]]]] = { | |
| "calc": calc, | |
| "time": time_now, | |
| "joke": joke, | |
| "search": ddg_search, | |
| "browse": (WebSearchInput, browse), | |
| "exec_python": (ExecInput, exec_py), | |
| "send_email": send_email, | |
| "run_code": run_python_code, | |
| "translate": translate, | |
| "describe_image": describe_image, | |
| } | |
| # === Dynamic Tool Execution === | |
| def use_tool(tool_name: str, *args, **kwargs) -> Any: | |
| tool = TOOLS.get(tool_name) | |
| if not tool: | |
| return {"error": f"Tool '{tool_name}' not found"} | |
| if isinstance(tool, tuple): | |
| schema_cls, func = tool | |
| try: | |
| input_data = args[0] if args else kwargs | |
| validated = schema_cls.parse_obj(input_data) | |
| return func(validated) | |
| except Exception as e: | |
| logger.error(f"Validation failed for '{tool_name}': {e}") | |
| return {"error": f"Invalid input for tool '{tool_name}': {e}"} | |
| else: | |
| try: | |
| return tool(*args, **kwargs) | |
| except Exception as e: | |
| logger.error(f"Execution failed for '{tool_name}': {e}") | |
| return {"error": f"Tool execution failed: {e}"} | |
| # === Dispatcher === | |
| def tool_dispatch(command: str, args: Any) -> Any: | |
| if command not in TOOLS: | |
| return {"error": f"Unknown tool '{command}'"} | |
| tool = TOOLS[command] | |
| if isinstance(args, (list, tuple)): | |
| return use_tool(command, *args) | |
| elif isinstance(args, dict): | |
| return use_tool(command, args) | |
| else: | |
| return use_tool(command, args) | |