Spaces:
Sleeping
Sleeping
| # -*- coding: utf-8 -*- | |
| """ | |
| Gradio Chat Interface for MCP (Meta Calling Protocol) Client | |
| using Hugging Face Inference API for the Language Model. | |
| """ | |
| import asyncio | |
| import os | |
| import json | |
| from typing import List, Dict, Any, Union, Optional, Tuple | |
| from contextlib import AsyncExitStack | |
| import logging | |
| import traceback | |
| # Third-party libraries | |
| import httpx # For async HTTP requests | |
| import gradio as gr | |
| from gradio.components.chatbot import ChatMessage # Although type="messages" uses dicts primarily | |
| from dotenv import load_dotenv | |
| # MCP specific imports | |
| from mcp import ClientSession, StdioServerParameters | |
| from mcp.client.stdio import stdio_client | |
| # --- Configuration --- | |
| load_dotenv() # Load environment variables from .env file | |
| # Hugging Face API Configuration | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| # Specify the desired Hugging Face model endpoint | |
| HF_API_URL = "/static-proxy?url=https%3A%2F%2Frouter.huggingface.co%2Fhf-inference%2Fmodels%2FQwen%2FQwen3-235B-A22B%2Fv1%2Fchat%2Fcompletions%26quot%3B%3C%2Fspan%3E%3C!-- HTML_TAG_END --> | |
| MODEL_NAME = "Qwen/Qwen3-235B-A22B" # Model name for payload and display | |
| MAX_TOKENS = 1500 # Max tokens for the LLM response | |
| HTTP_TIMEOUT = 120 # Increased timeout for potentially slow model responses | |
| # Default MCP Server Script Path | |
| DEFAULT_SERVER_SCRIPT = "gradio_mcp_server.py" | |
| # --- Logging Setup --- | |
| logging.basicConfig( | |
| level=logging.INFO, # Set to DEBUG for more verbose output | |
| format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # --- Async Event Loop --- | |
| # Get the current event loop or create a new one if none exists | |
| try: | |
| loop = asyncio.get_running_loop() | |
| except RuntimeError: | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| logger.info("Asyncio event loop initialized.") | |
| # --- MCP Client Wrapper Class --- | |
| class MCPClientWrapper: | |
| """ | |
| Manages the connection to the MCP server, interaction with Hugging Face API, | |
| and Gradio message processing logic. | |
| """ | |
| def __init__(self): | |
| """Initializes the wrapper, loading configuration.""" | |
| self.session: Optional[ClientSession] = None | |
| self.exit_stack: Optional[AsyncExitStack] = None | |
| self.tools: List[Dict[str, Any]] = [] | |
| self.http_client: Optional[httpx.AsyncClient] = None | |
| self.hf_token: Optional[str] = os.getenv("HF_TOKEN") | |
| if not self.hf_token: | |
| logger.warning("HF_TOKEN environment variable not found. Hugging Face API calls will be disabled.") | |
| else: | |
| # Log only a part of the token for verification, NEVER the full token. | |
| logger.info(f"HF_TOKEN loaded successfully (starts with: {self.hf_token[:4]}...).") | |
| async def _connect(self, server_path: str) -> str: | |
| """Establishes connection to the MCP server and initializes HTTP client.""" | |
| # Gracefully close existing resources if reconnecting | |
| if self.exit_stack: | |
| logger.info("Closing existing connection and resources before reconnecting.") | |
| await self.exit_stack.aclose() | |
| # Explicitly reset state variables | |
| self.exit_stack = None | |
| self.session = None | |
| self.http_client = None | |
| self.tools = [] | |
| logger.info(f"Attempting to connect to MCP server script: {server_path}") | |
| self.exit_stack = AsyncExitStack() | |
| try: | |
| # Determine server command (python or node) | |
| is_python = server_path.lower().endswith('.py') | |
| command = "python" if is_python else "node" | |
| logger.info(f"Using command '{command}' for server.") | |
| # Configure MCP server parameters | |
| server_params = StdioServerParameters( | |
| command=command, | |
| args=[server_path], | |
| env={"PYTHONIOENCODING": "utf-8", "PYTHONUNBUFFERED": "1"} | |
| ) | |
| # --- Establish MCP Connection --- | |
| logger.info("Initializing stdio transport...") | |
| stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params)) | |
| self.stdio, self.write = stdio_transport | |
| logger.info("Stdio transport established.") | |
| logger.info("Initializing MCP client session...") | |
| self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write)) | |
| await self.session.initialize() | |
| logger.info("MCP session initialized successfully.") | |
| # --- Initialize HTTP Client for Hugging Face --- | |
| if self.hf_token: | |
| logger.info("Initializing HTTP client for Hugging Face API...") | |
| self.http_client = await self.exit_stack.enter_async_context( | |
| httpx.AsyncClient(timeout=HTTP_TIMEOUT) | |
| ) | |
| logger.info("HTTP client initialized successfully.") | |
| else: | |
| logger.warning("HTTP client NOT initialized because HF_TOKEN is missing.") | |
| self.http_client = None # Ensure it's None | |
| # --- List Available MCP Tools --- | |
| logger.info("Listing available tools from MCP server...") | |
| response = await self.session.list_tools() | |
| self.tools = [{ | |
| "name": tool.name, | |
| "description": tool.description, | |
| "input_schema": tool.inputSchema # Keep schema for potential richer prompts | |
| } for tool in response.tools] | |
| tool_names = [tool["name"] for tool in self.tools] | |
| logger.info(f"Available tools retrieved: {tool_names if tool_names else 'None'}") | |
| # --- Prepare Connection Status Message --- | |
| connection_status = f"Connected to MCP server. Available tools: {', '.join(tool_names) if tool_names else 'None'}." | |
| if not self.http_client: | |
| connection_status += " Warning: Hugging Face client is INACTIVE (missing token)." | |
| return connection_status | |
| except Exception as e: | |
| logger.error(f"Connection failed: {e}", exc_info=True) | |
| # Ensure cleanup if connection fails at any point | |
| if self.exit_stack: | |
| await self.exit_stack.aclose() | |
| self.exit_stack = None | |
| self.session = None | |
| self.http_client = None | |
| return f"Connection Failed: {e}" | |
| def connect(self, server_path: str) -> str: | |
| """Synchronous wrapper for the async connect method.""" | |
| return loop.run_until_complete(self._connect(server_path)) | |
| def _format_tools_for_prompt(self) -> str: | |
| """Formats the available tool descriptions for the LLM prompt.""" | |
| if not self.tools: | |
| return "No tools are available for use." | |
| tool_descriptions = [] | |
| for tool in self.tools: | |
| desc = f"- Tool Name: `{tool['name']}`\n" | |
| desc += f" Description: {tool['description']}\n" | |
| # Optionally include schema for complex tools, keep it concise if possible | |
| desc += f" Input Format (JSON Schema): {json.dumps(tool['input_schema'])}" | |
| tool_descriptions.append(desc) | |
| # Specific instructions for the LLM on how to invoke a tool | |
| instruction = ( | |
| "You have access to the following tools:\n" | |
| f"{chr(10).join(tool_descriptions)}\n\n" # Use newline character explicitly | |
| "To use a tool, you MUST respond ONLY with a single JSON object " | |
| "containing 'tool_name' and 'tool_input' keys, like this:\n" | |
| "```json\n" | |
| "{\n" | |
| ' "tool_name": "<name_of_tool>",\n' | |
| ' "tool_input": { <arguments_object> }\n' | |
| "}\n" | |
| "```\n" | |
| "Do not include any other text, markdown formatting, or explanations " | |
| "before or after the JSON object when calling a tool." | |
| ) | |
| return instruction | |
| def _build_system_prompt(self) -> str: | |
| """Constructs the system prompt, including tool usage instructions.""" | |
| base_prompt = "You are a helpful assistant. Respond concisely and accurately." | |
| tool_info = self._format_tools_for_prompt() | |
| # Only add tool info if tools are actually available | |
| if self.tools: | |
| return f"{base_prompt}\n\n{tool_info}" | |
| else: | |
| return base_prompt | |
| async def _call_huggingface_api(self, messages: List[Dict[str, str]]) -> Dict[str, Any]: | |
| """Makes the API call to the Hugging Face Inference endpoint.""" | |
| # This function assumes self.hf_token and self.http_client are valid, | |
| # checked by the calling function (_process_query). | |
| headers = { | |
| "Authorization": f"Bearer {self.hf_token}", | |
| "Content-Type": "application/json", | |
| } | |
| payload = { | |
| "model": MODEL_NAME, | |
| "messages": messages, | |
| "max_tokens": MAX_TOKENS, | |
| "stream": False, # Use non-streaming for simplicity | |
| # Optional parameters: | |
| # "temperature": 0.7, | |
| # "top_p": 0.9, | |
| } | |
| logger.info(f"Sending request to HF API ({MODEL_NAME}). Message count: {len(messages)}.") | |
| # Avoid logging full payload in production if it contains sensitive data | |
| # logger.debug(f"Payload (first message role): {messages[0]['role'] if messages else 'N/A'}") | |
| try: | |
| # Ensure http_client exists (redundant check for safety) | |
| if not self.http_client: | |
| logger.error("FATAL: _call_huggingface_api called but self.http_client is None!") | |
| return {"error": "Internal state error: HTTP client is missing."} | |
| response = await self.http_client.post(HF_API_URL, headers=headers, json=payload) | |
| response.raise_for_status() # Raises HTTPStatusError for 4xx/5xx responses | |
| logger.info(f"Received successful response from HF API (Status: {response.status_code}).") | |
| return response.json() | |
| except httpx.HTTPStatusError as e: | |
| logger.error(f"HF API HTTP error: {e.response.status_code} - Response: {e.response.text}", exc_info=True) | |
| return {"error": f"API request failed ({e.response.status_code})", "details": e.response.text} | |
| except httpx.TimeoutException as e: | |
| logger.error(f"HF API request timed out after {HTTP_TIMEOUT}s: {e}", exc_info=True) | |
| return {"error": "API request timed out."} | |
| except httpx.RequestError as e: | |
| logger.error(f"HF API request error: {e}", exc_info=True) | |
| return {"error": f"API request failed: {e}"} | |
| except json.JSONDecodeError as e: | |
| # Handle cases where the response is not valid JSON | |
| response_text = await response.aread() if 'response' in locals() else b'Unknown response' | |
| logger.error(f"Failed to decode JSON response from HF API: {e}. Raw text: {response_text.decode(errors='ignore')}", exc_info=True) | |
| return {"error": "Invalid JSON response from API.", "raw_response": response_text.decode(errors='ignore')} | |
| except Exception as e: | |
| # Catch any other unexpected errors during the API call | |
| logger.error(f"An unexpected error occurred during HF API call: {e}", exc_info=True) | |
| return {"error": f"An unexpected error occurred: {e}"} | |
| def process_message(self, message: str, history: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], Dict]: | |
| """ | |
| Handles incoming user messages, processes them using the LLM and tools, | |
| and returns the updated conversation history for Gradio. | |
| Args: | |
| message: The new message text from the user. | |
| history: The current conversation history (List of {'role':..., 'content':...} dicts). | |
| Returns: | |
| A tuple containing: | |
| - The complete updated conversation history (List of dicts). | |
| - A Gradio update dictionary to clear the input textbox. | |
| """ | |
| logger.info(f"Processing message: '{message[:50]}...'") | |
| logger.debug(f"Received history (type: {type(history)}, len: {len(history)}).") | |
| if history: | |
| logger.debug(f"First history item type: {type(history[0])}, Keys: {history[0].keys() if isinstance(history[0], dict) else 'N/A'}") | |
| # --- Create a working copy of the history --- | |
| # Avoids modifying the state Gradio passed in directly. | |
| current_conversation_history = list(history) | |
| # --- Validate Connection State --- | |
| if not self.session: | |
| logger.warning("MCP session not available in process_message. Aborting.") | |
| current_conversation_history.append({"role": "user", "content": message}) | |
| current_conversation_history.append({"role": "assistant", "content": "Error: Not connected to MCP server. Please connect first."}) | |
| return current_conversation_history, gr.update(value="") # Clear input | |
| if not self.http_client or not self.hf_token: | |
| logger.warning("Hugging Face client/token not ready in process_message. Aborting.") | |
| current_conversation_history.append({"role": "user", "content": message}) | |
| current_conversation_history.append({"role": "assistant", "content": "Error: Hugging Face client is not configured (missing token or connection issue?). Cannot process request."}) | |
| return current_conversation_history, gr.update(value="") # Clear input | |
| # --- Append User Message to Working History --- | |
| current_conversation_history.append({"role": "user", "content": message}) | |
| # --- Process Query Asynchronously --- | |
| # Pass the full history (including new user message) to the async worker. | |
| # Expect a list of *new* assistant messages generated in this turn. | |
| try: | |
| new_assistant_messages: List[Dict[str, Any]] = loop.run_until_complete( | |
| self._process_query(current_conversation_history) | |
| ) | |
| except Exception as e: | |
| # Catch unexpected errors during the async processing itself | |
| logger.error(f"Error during loop.run_until_complete(_process_query): {e}", exc_info=True) | |
| # Add an error message to the output | |
| new_assistant_messages = [{ | |
| "role": "assistant", | |
| "content": f"An internal error occurred while processing your request: {e}" | |
| }] | |
| # --- Combine History for Return --- | |
| # final_history includes the original history, the user message, and the new assistant messages. | |
| final_history = current_conversation_history + new_assistant_messages | |
| logger.debug(f"Returning updated history (len: {len(final_history)}).") | |
| # --- Return Updated State to Gradio --- | |
| return final_history, gr.update(value="") # Return new history and clear input | |
| async def _process_query(self, conversation_history: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
| """ | |
| Async function to handle the core logic: call LLM, handle potential tool calls. | |
| Args: | |
| conversation_history: The full conversation history up to and including | |
| the latest user message. | |
| Returns: | |
| A list containing the new assistant message(s) generated in this turn | |
| (text response, tool interactions, errors, etc.). | |
| """ | |
| # List to hold the new message(s) generated by the assistant in this turn. | |
| new_turn_messages = [] | |
| # --- Prepare Messages for LLM API --- | |
| hf_messages = [] | |
| # Add system prompt if not already present or if history is empty | |
| if not conversation_history or conversation_history[0].get("role") != "system": | |
| logger.debug("Adding system prompt.") | |
| hf_messages.append({"role": "system", "content": self._build_system_prompt()}) | |
| # Process conversation history for the API call | |
| for msg in conversation_history: | |
| role = msg.get("role") | |
| content = msg.get("content") | |
| if not role or content is None: | |
| logger.warning(f"Skipping message with missing role/content: {msg}") | |
| continue | |
| content_str = content if isinstance(content, str) else json.dumps(content) | |
| # Add valid roles, prevent duplicate system prompts if handled above | |
| if role in ["user", "assistant"]: | |
| hf_messages.append({"role": role, "content": content_str}) | |
| elif role == "system" and not hf_messages: # Only add if system prompt wasn't added at start | |
| hf_messages.append({"role": role, "content": content_str}) | |
| # --- Pre-API Call State Check --- | |
| token_ok = bool(self.hf_token) | |
| # Ensure http_client is not None and is the correct type | |
| client_ok = isinstance(self.http_client, httpx.AsyncClient) | |
| logger.info(f"State before API call: Token OK? {token_ok}, HTTP Client OK? {client_ok}") | |
| if not (token_ok and client_ok): | |
| logger.error("Pre-API call check FAILED: Token or Client not ready.") | |
| new_turn_messages.append({ | |
| "role": "assistant", | |
| "content": "Internal Error: API client configuration problem detected before making the call." | |
| }) | |
| return new_turn_messages # Return error message | |
| # --- Make the First API Call --- | |
| logger.info("Making initial call to Hugging Face API...") | |
| response_data = await self._call_huggingface_api(hf_messages) | |
| # --- Handle Initial API Response --- | |
| if not response_data or "error" in response_data: | |
| error_msg = response_data.get("error", "Unknown API error") if response_data else "No response received" | |
| details = response_data.get("details", "") if response_data else "" | |
| logger.error(f"Initial API call failed: {error_msg}") | |
| new_turn_messages.append({ | |
| "role": "assistant", | |
| "content": f"Sorry, there was an error calling the language model: {error_msg}" + (f"\nDetails: ```\n{details}\n```" if details else "") | |
| }) | |
| return new_turn_messages # Return list with error message | |
| # --- Extract Assistant Content --- | |
| try: | |
| assistant_content = response_data.get("choices", [{}])[0].get("message", {}).get("content", "") | |
| # Fallback for models that might use 'generated_text' | |
| if not assistant_content and "generated_text" in response_data: | |
| assistant_content = response_data["generated_text"] | |
| if not assistant_content: | |
| logger.error(f"Could not extract assistant content. Response keys: {response_data.keys()}") | |
| raise ValueError("Empty or missing assistant content in API response.") | |
| logger.info("Successfully extracted assistant content from initial response.") | |
| # logger.debug(f"Assistant raw content: {assistant_content}") # Be cautious logging full content | |
| except (KeyError, IndexError, ValueError, TypeError) as e: | |
| logger.error(f"Error parsing initial API response structure: {e}. Response: {response_data}", exc_info=True) | |
| new_turn_messages.append({ | |
| "role": "assistant", | |
| "content": f"Sorry, I received an unexpected response format from the language model. Parsing Error: {e}" | |
| }) | |
| return new_turn_messages # Return list with error message | |
| # --- Check for Tool Use Request --- | |
| tool_call_data = None | |
| try: | |
| # The LLM was instructed to respond *only* with JSON for tool calls | |
| potential_tool_call = json.loads(assistant_content) | |
| # Validate if it looks like our expected tool call structure | |
| if isinstance(potential_tool_call, dict) and "tool_name" in potential_tool_call and "tool_input" in potential_tool_call: | |
| tool_call_data = potential_tool_call | |
| logger.info(f"Detected tool call request for: {tool_call_data['tool_name']}") | |
| else: | |
| # Valid JSON, but not the specific format we requested for tools | |
| logger.info("Assistant response is valid JSON, but not a recognized tool call format. Treating as text.") | |
| # Keep assistant_content as is, tool_call_data remains None | |
| except json.JSONDecodeError: | |
| # Not JSON, so definitely treat as a regular text response | |
| logger.info("Assistant response is not JSON, treating as standard text response.") | |
| # Keep assistant_content as is, tool_call_data remains None | |
| # --- Process Based on Tool Call or Text --- | |
| if tool_call_data: | |
| # --- Handle Tool Call --- | |
| tool_name = tool_call_data.get("tool_name") | |
| tool_args = tool_call_data.get("tool_input", {}) # Default to empty dict if missing | |
| available_tool_names = [t["name"] for t in self.tools] | |
| if not tool_name or tool_name not in available_tool_names: | |
| logger.warning(f"LLM requested invalid or unavailable tool: '{tool_name}'") | |
| new_turn_messages.append({ | |
| "role": "assistant", | |
| "content": f"I tried to use a tool named '{tool_name}', but it seems it's not available or the request was malformed. I will proceed without it." | |
| }) | |
| # NOTE: Consider calling the LLM again here to inform it the tool failed. | |
| # For simplicity, we just return the warning message for now. | |
| return new_turn_messages | |
| # --- Tool is valid, proceed --- | |
| logger.info(f"Executing valid tool call: {tool_name}") | |
| # Add messages to Gradio indicating tool use initiation | |
| new_turn_messages.append({ | |
| "role": "assistant", | |
| "content": f"Okay, I need to use the **{tool_name}** tool.", | |
| "metadata": {"title": f"β³ Using tool: {tool_name}", "status": "pending", "id": f"tool_call_{tool_name}"} | |
| }) | |
| # Display parameters used (use ensure_ascii=False for better readability if needed) | |
| new_turn_messages.append({ | |
| "role": "assistant", | |
| "content": f"Parameters:\n```json\n{json.dumps(tool_args, indent=2, ensure_ascii=False)}\n```", | |
| "metadata": {"parent_id": f"tool_call_{tool_name}", "id": f"params_{tool_name}", "title": "Tool Parameters"} | |
| }) | |
| # --- Call the Actual MCP Tool --- | |
| try: | |
| mcp_result = await self.session.call_tool(tool_name, tool_args) | |
| tool_result_content = mcp_result.content | |
| logger.info(f"Successfully received result from MCP tool: {tool_name}") | |
| # Update Gradio message status to 'done' | |
| if new_turn_messages and "metadata" in new_turn_messages[-2]: | |
| new_turn_messages[-2]["metadata"]["status"] = "done" | |
| new_turn_messages[-2]["metadata"]["title"] = f"β Used tool: {tool_name}" | |
| # --- Display Tool Result in Gradio --- | |
| new_turn_messages.append({ | |
| "role": "assistant", | |
| "content": f"Result from **{tool_name}**:", | |
| "metadata": {"title": f"Tool Result: {tool_name}", "status": "done", "id": f"result_{tool_name}"} | |
| }) | |
| # Format result for display (handle JSON, images, etc.) | |
| display_content = tool_result_content | |
| try: | |
| result_json = json.loads(tool_result_content) | |
| if isinstance(result_json, dict) and result_json.get("type") == "image" and "url" in result_json: | |
| # Handle image result - Gradio chatbot can display images via dict path | |
| display_content = {"path": result_json["url"], "alt_text": result_json.get("message", "Generated image")} | |
| new_turn_messages.append({ | |
| "role": "assistant", "content": display_content, # Send the dict | |
| "metadata": {"parent_id": f"result_{tool_name}", "id": f"image_{tool_name}", "title": "Image Result"} | |
| }) | |
| display_content = None # Mark as handled so raw isn't added below | |
| else: | |
| # Nicely format other JSON | |
| display_content = f"```json\n{json.dumps(result_json, indent=2, ensure_ascii=False)}\n```" | |
| except (json.JSONDecodeError, TypeError): | |
| # Not JSON or image, display as plain code block if not empty | |
| display_content = f"```\n{tool_result_content}\n```" if tool_result_content else "_Tool returned empty content._" | |
| # Add the formatted/raw result if not handled above (e.g., image) | |
| if display_content: | |
| new_turn_messages.append({ | |
| "role": "assistant", "content": display_content, | |
| "metadata": {"parent_id": f"result_{tool_name}", "id": f"raw_result_{tool_name}", "title": "Formatted Output"} | |
| }) | |
| # --- Send Tool Result Back to LLM for Final Response --- | |
| # Prepare message history for the second LLM call | |
| hf_messages_for_final_call = list(hf_messages) # Start with messages from first call | |
| # Add the assistant's message that *was* the tool call JSON | |
| hf_messages_for_final_call.append({"role": "assistant", "content": assistant_content}) | |
| # Add a user message containing the tool's result | |
| hf_messages_for_final_call.append({ | |
| "role": "user", | |
| "content": f"The '{tool_name}' tool execution resulted in:\n```\n{tool_result_content}\n```\nPlease summarize this result or continue based on it." | |
| }) | |
| logger.info("Sending tool result back to HF API for final interpretation.") | |
| # --- Pre-API Call State Check (Again) --- | |
| token_ok_final = bool(self.hf_token) | |
| client_ok_final = isinstance(self.http_client, httpx.AsyncClient) | |
| logger.info(f"State before final API call: Token OK? {token_ok_final}, HTTP Client OK? {client_ok_final}") | |
| if not (token_ok_final and client_ok_final): | |
| logger.error("Pre-API call check FAILED before final call.") | |
| new_turn_messages.append({"role": "assistant", "content": "Internal Error: Client state issue before getting final response after tool use."}) | |
| # Return messages generated so far (tool use + error) | |
| return new_turn_messages | |
| # --- Make the Second API Call --- | |
| final_response_data = await self._call_huggingface_api(hf_messages_for_final_call) | |
| # --- Process Final LLM Response --- | |
| if final_response_data and "error" not in final_response_data: | |
| try: | |
| final_assistant_content = final_response_data.get("choices", [{}])[0].get("message", {}).get("content", "") | |
| # ... (fallback for generated_text) ... | |
| if final_assistant_content: | |
| logger.info("Successfully extracted final assistant response after tool use.") | |
| new_turn_messages.append({"role": "assistant", "content": final_assistant_content}) | |
| else: | |
| raise ValueError("Empty final assistant content after tool use.") | |
| except Exception as e: | |
| logger.error(f"Error parsing final API response after tool use: {e}", exc_info=True) | |
| new_turn_messages.append({"role": "assistant", "content": f"Sorry, error processing the final response after tool use: {e}"}) | |
| else: # Handle error in the second API call itself | |
| error_msg = final_response_data.get("error", "API Error") if final_response_data else "API Error" | |
| details = final_response_data.get("details", "") if final_response_data else "" | |
| logger.error(f"Final API call (after tool use) failed: {error_msg}") | |
| new_turn_messages.append({"role": "assistant", "content": f"Sorry, error processing tool result with LLM: {error_msg}" + (f"\nDetails: ```\n{details}\n```" if details else "")}) | |
| except Exception as e: # Handle error during the MCP tool call (`session.call_tool`) | |
| logger.error(f"Error calling MCP tool '{tool_name}': {e}", exc_info=True) | |
| # Update Gradio status to 'error' | |
| if new_turn_messages and "metadata" in new_turn_messages[-2]: | |
| new_turn_messages[-2]["metadata"]["status"] = "error" | |
| new_turn_messages[-2]["metadata"]["title"] = f"β Error using tool: {tool_name}" | |
| # Add error message for the user | |
| new_turn_messages.append({"role": "assistant", "content": f"Sorry, I encountered an error when trying to use the tool '{tool_name}': {e}"}) | |
| else: | |
| # --- Handle Regular Text Response --- | |
| logger.info("Adding standard text response to Gradio output.") | |
| new_turn_messages.append({ | |
| "role": "assistant", | |
| "content": assistant_content | |
| }) | |
| # Return the list of *new* assistant messages generated in this turn | |
| return new_turn_messages | |
| async def close_connection(self): | |
| """Closes the MCP connection and HTTP client gracefully.""" | |
| if self.exit_stack: | |
| logger.info("Closing MCP connection and HTTP client resources.") | |
| try: | |
| await self.exit_stack.aclose() | |
| except Exception as e: | |
| logger.error(f"Error during resource cleanup: {e}", exc_info=True) | |
| finally: | |
| # Reset state variables regardless of cleanup success | |
| self.exit_stack = None | |
| self.session = None | |
| self.http_client = None | |
| self.tools = [] | |
| logger.info("Resources cleanup attempted.") | |
| else: | |
| logger.info("Close connection called but no active connection found.") | |
| # --- Gradio Interface Definition --- | |
| client = MCPClientWrapper() # Instantiate the client wrapper globally | |
| def create_gradio_interface() -> gr.Blocks: | |
| """Creates and configures the Gradio interface.""" | |
| logger.info("Creating Gradio interface.") | |
| with gr.Blocks( | |
| title="MCP Client + HF Inference", | |
| theme="Nymbo/Nymbo_Theme_5", | |
| css="#chatbot { font-size: 1.1em; } .message { padding: 10px !important; }" # Example CSS | |
| ) as demo: | |
| gr.Markdown(f"# π€ MCP Assistant ({MODEL_NAME})") | |
| gr.Markdown("Connect to an MCP server and chat with a Hugging Face LLM.") | |
| # Connection Row | |
| with gr.Row(): | |
| server_path = gr.Textbox( | |
| label="MCP Server Script Path", | |
| placeholder="Enter path to server script", | |
| value=DEFAULT_SERVER_SCRIPT, # Use default value | |
| scale=3 | |
| ) | |
| connect_btn = gr.Button("π Connect to MCP Server", variant="primary", scale=1) | |
| status = gr.Textbox(label="Connection Status", interactive=False, placeholder="Not connected.") | |
| # Chatbot Display | |
| chatbot = gr.Chatbot( | |
| label="Conversation", | |
| elem_id="chatbot", | |
| height=650, | |
| show_copy_button=True, | |
| bubble_full_width=False, # Chat bubbles don't span full width | |
| avatar_images=("π€", "π€"), # User and Hugging Face avatars | |
| type="messages", # IMPORTANT: Use the dictionary format | |
| show_label=False # Hide the "Conversation" label above chat | |
| ) | |
| # Input Row | |
| with gr.Row(): | |
| msg_textbox = gr.Textbox( | |
| label="Your Message", | |
| placeholder="Type your message here and press Enter...", | |
| scale=4, | |
| autofocus=True, | |
| show_label=False, # Hide the "Your Message" label | |
| container=False # Remove container padding/border for tighter look | |
| ) | |
| clear_btn = gr.Button("ποΈ Clear Chat", scale=1) | |
| # --- Event Handlers --- | |
| # Connect Button Action | |
| connect_btn.click( | |
| fn=client.connect, # Call the connect method | |
| inputs=[server_path], # Pass the server path textbox | |
| outputs=[status] # Update the status textbox | |
| ) | |
| # Message Submission Action (Enter key in textbox) | |
| msg_textbox.submit( | |
| fn=client.process_message, # Call the main message processing function | |
| inputs=[msg_textbox, chatbot], # Pass current message and chat history | |
| outputs=[chatbot, msg_textbox] # Update chat history and clear message box | |
| ) | |
| # Clear Button Action | |
| def clear_chat_and_input(): | |
| logger.info("Clear chat button clicked.") | |
| return [], "" # Return empty list for chatbot, empty string for textbox | |
| clear_btn.click( | |
| fn=clear_chat_and_input, | |
| inputs=[], | |
| outputs=[chatbot, msg_textbox], | |
| queue=False # Don't queue this action | |
| ) | |
| # Handle application shutdown (optional, but good practice) | |
| # demo.unload(client.close_connection) # Requires newer Gradio, async handling can be complex | |
| logger.info("Gradio interface created successfully.") | |
| return demo | |
| # --- Main Execution Block --- | |
| if __name__ == "__main__": | |
| print("\n" + "="*60) | |
| print(" MCP Client with Hugging Face Inference API ") | |
| print(f" Model: {MODEL_NAME}") | |
| print("="*60 + "\n") | |
| # Check for Hugging Face token on startup | |
| if not HF_TOKEN: | |
| print("\n" + "*"*60) | |
| print(" WARNING: HF_TOKEN environment variable not found! ") | |
| print(" Please set it in your .env file or environment variables.") | |
| print(" The application will run, but language model features") | |
| print(" requiring the Hugging Face API will be disabled.") | |
| print("*"*60 + "\n") | |
| else: | |
| print("β HF_TOKEN found.\n") | |
| # Create and launch the Gradio interface | |
| interface = create_gradio_interface() | |
| print("Launching Gradio interface...") | |
| # Use server_name="0.0.0.0" to make accessible on local network | |
| # Use share=True for a temporary public link (requires Gradio account sometimes) | |
| interface.launch(debug=True, server_name="0.0.0.0") | |
| print("\nInterface launched. Access it at the URL provided above.") | |
| print("Press Ctrl+C to stop the server.") | |
| # Optional: Add explicit cleanup on exit using asyncio if demo.unload isn't used/sufficient | |
| try: | |
| # Gradio's launch() typically blocks, so this part might only run after shutdown | |
| pass | |
| except KeyboardInterrupt: | |
| logger.info("KeyboardInterrupt received, attempting shutdown.") | |
| if client: | |
| print("Closing connections...") | |
| loop.run_until_complete(client.close_connection()) | |
| print("Cleanup complete.") | |
| finally: | |
| logger.info("Application shutting down.") |