dpc / mcp_client.py
cevheri's picture
build: docker error fixed
6cc2cc4
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.prebuilt import create_react_agent
from langchain_mcp_adapters.tools import load_mcp_tools
from langchain_openai import ChatOpenAI
from environs import Env
from loguru import logger
env = Env()
env.read_env()
AGENT_DEBUG = env.bool("AGENT_DEBUG", True)
LLM_PROVIDER = env.str("LLM_PROVIDER", "OPENAI")
LLM_API_KEY = env.str("LLM_API_KEY")
LLM_MODEL = env.str("LLM_MODEL")
MCP_SERVER_URL = env.str("MCP_SERVER_URL")
def get_llm():
if LLM_PROVIDER == "OPENAI":
return ChatOpenAI(model=LLM_MODEL, api_key=LLM_API_KEY)
elif LLM_PROVIDER == "GEMINI":
raise NotImplementedError("Gemini is not supported yet")
else:
raise ValueError(f"Unsupported LLM provider: {LLM_PROVIDER}")
class MCPClient:
_instance = None
def __new__(cls):
logger.info("Creating MCP client instance...")
if cls._instance is None:
cls._instance = super(MCPClient, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
logger.info("Initializing MCP client instance...")
if self._initialized:
return
try:
self.llm = get_llm()
self.tools = None
self.agent = None
self.instruction = ""
self.api_description = ""
self.system_message = ""
self._initialized = True
self.checkpointer = InMemorySaver()
self.load_instruction()
self.load_api_description()
logger.info("MCP client instance initialized successfully")
except Exception as e:
logger.error(f"Error initializing MCP client: {str(e)}")
raise
def load_instruction(self) -> str:
logger.info("Loading instruction...")
try:
instruction_path = "resources/instruction.txt"
logger.info(f"Looking for instruction file at: {instruction_path}")
with open(instruction_path, "r") as file:
self.instruction = file.read()
logger.info("Instruction loaded successfully")
except FileNotFoundError:
logger.error(f"Instruction file not found at: {instruction_path}")
raise FileNotFoundError(f"Instruction file not found at: {instruction_path}")
except Exception as e:
logger.error(f"Error loading instruction: {str(e)}")
raise
def load_api_description(self) -> str:
logger.info("Loading api description...")
try:
api_path = "resources/dpc_restapi_summary.txt"
logger.info(f"Looking for API description file at: {api_path}")
with open(api_path, "r") as file:
self.api_description = file.read()
logger.info("API description loaded successfully")
except FileNotFoundError:
logger.error(f"API description file not found at: {api_path}")
raise FileNotFoundError(f"API description file not found at: {api_path}")
except Exception as e:
logger.error(f"Error loading API description: {str(e)}")
raise
async def initialize(self):
logger.info("Initializing MCP client...")
try:
logger.info("Print Environment Variables: \n")
logger.info(f"LLM_PROVIDER: {LLM_PROVIDER}")
logger.info(f"LLM_API_KEY: {LLM_API_KEY[:12]}...")
logger.info(f"LLM_MODEL: {LLM_MODEL}")
logger.info(f"MCP_SERVER_URL: {MCP_SERVER_URL}")
self.system_message = "".join([self.instruction, "\n\n", self.api_description])
logger.info("MCP client initialization completed successfully")
except Exception as e:
logger.error(f"Error during MCP client initialization: {str(e)}")
raise
async def clear_memory(self):
"""Clear the checkpointer memory for the current conversation."""
logger.info("Clearing checkpointer memory...")
await self.checkpointer.adelete_thread(thread_id="conversation_123")
logger.info("Checkpointer memory cleared successfully")
async def invoke(self, input_messages):
logger.info(f"Invoking agent with input: {input_messages}")
async with streamablehttp_client(MCP_SERVER_URL) as (read, write, _):
async with ClientSession(read, write) as session:
await session.initialize()
logger.info("Loading tools...")
self.tools = await load_mcp_tools(session)
logger.info("Creating agent...")
self.agent = create_react_agent(
model=self.llm,
tools=self.tools,
prompt=self.system_message,
checkpointer=self.checkpointer,
debug=AGENT_DEBUG,
)
logger.info("Invoking agent...")
config = {"configurable": {"thread_id": "conversation_123"}}
result = await self.agent.ainvoke(
input={"messages": input_messages}, config=config
)
logger.info(f"Agent result: {result}")
logger.info("========================================================")
last_message = result["messages"][-1]
logger.info(f"Last message: {last_message.content}")
return last_message.content
# Agent result: {'messages':
# [HumanMessage(content='hi', additional_kwargs={}, response_metadata={}, id='205d9484-c4f0-4e9e-962f-218d2e82bc03'),
# AIMessage(content='This is just a greeting, so no API call is required. If you have any tasks or requests related to product catalog operations, please let me know how I can assist you!',
# additional_kwargs={'refusal': None},
# response_metadata={'token_usage': {'completion_tokens': 37, 'prompt_tokens': 27124, 'total_tokens': 27161,
# 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0},
# 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 27008}}, 'model_name': 'gpt-4.1-2025-04-14',
# 'system_fingerprint': 'fp_799e4ca3f1', 'id': 'chatcmpl-BejXD72NDlc9UqrJiobIA6tQbuNaj', 'service_tier': 'default', 'finish_reason': 'stop', 'logprobs': None}, id='run--dee9f47d-99f9-40a6-b8c7-241668e6ac38-0',
# usage_metadata={'input_tokens': 27124, 'output_tokens': 37, 'total_tokens': 27161, 'input_token_details': {'audio': 0, 'cache_read': 27008}, 'output_token_details': {'audio': 0, 'reasoning': 0}})]}