Spaces:
Sleeping
Sleeping
| from typing import List, Tuple, Union | |
| from llama_cpp import Llama | |
| import os, json, subprocess, datetime, requests | |
| from langchain_community.llms import LlamaCpp | |
| from langchain.agents.agent_types import AgentType | |
| from langchain.agents import Tool, AgentExecutor, initialize_agent | |
| from langchain.agents.agent_types import AgentType | |
| from langchain.agents import initialize_agent, Tool, AgentExecutor, AgentType | |
| # === Load Model === | |
| MODEL_PATH = "models/capybarahermes-2.5-mistral-7b.Q5_K_S.gguf" | |
| if not os.path.exists(MODEL_PATH): | |
| raise FileNotFoundError(f"β Model not found at {MODEL_PATH}") | |
| llm = Llama(model_path=MODEL_PATH, n_ctx=2048, verbose=True) | |
| HISTORY_FILE = "agent_logs.json" | |
| # === Log Prompt/Response === | |
| def log_task(prompt: str, response: str): | |
| log = { | |
| "timestamp": datetime.datetime.now().isoformat(), | |
| "prompt": prompt, | |
| "response": response, | |
| } | |
| with open(HISTORY_FILE, "a") as f: | |
| f.write(json.dumps(log) + "\n") | |
| # === Query LLaMA === | |
| def query_llama(prompt: str, max_tokens: int = 256, temperature: float = 0.7, top_p: float = 0.9, stream: bool = False) -> str: | |
| output = llm(prompt=prompt, max_tokens=max_tokens, temperature=temperature, top_p=top_p, echo=False, stream=stream) | |
| if stream: | |
| return "".join([chunk["choices"][0]["text"] for chunk in output]).strip() | |
| return output["choices"][0]["text"].strip() | |
| # === Utilities === | |
| def exec_python(code: str) -> str: | |
| try: | |
| local_env = {} | |
| exec(code, {}, local_env) | |
| return str(local_env.get("result", "β Code executed.")) | |
| except Exception as e: | |
| return f"β Python Error: {e}" | |
| def read_file(filepath: str) -> str: | |
| try: | |
| if not os.path.exists(filepath): | |
| return f"β File not found: {filepath}" | |
| with open(filepath, "r") as f: | |
| return f.read() | |
| except Exception as e: | |
| return f"β File Read Error: {e}" | |
| def create_langchain_agent(model_path: str) -> AgentExecutor: | |
| llm = Llama( | |
| model_path=model_path, | |
| temperature=0.7, | |
| max_tokens=512, | |
| verbose=True, | |
| ) | |
| tools = [ | |
| Tool( | |
| name="LLaMA Model", | |
| func=lambda x: llm(x)["choices"][0]["text"], | |
| description="Answer general questions using local LLaMA model" | |
| ) | |
| ] | |
| return initialize_agent( | |
| tools=tools, | |
| llm=llm, | |
| agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, | |
| verbose=True, | |
| ) | |
| def make_local_agent(model_path: str): | |
| # Use LangChain-compatible wrapper | |
| llm = LlamaCpp( | |
| model_path=model_path, | |
| n_ctx=4096, | |
| temperature=0.7, | |
| verbose=True | |
| ) | |
| tools = [ | |
| Tool( | |
| name="LLaMA Tool", | |
| func=lambda x: llm.invoke(x), # Proper call | |
| description="Use the LLaMA model to answer questions" | |
| ) | |
| ] | |
| agent = initialize_agent( | |
| tools=tools, | |
| llm=llm, | |
| agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, | |
| verbose=True | |
| ) | |
| return agent | |
| def write_file(filepath: str, content: str) -> str: | |
| try: | |
| with open(filepath, "w") as f: | |
| f.write(content) | |
| return f"β File written to {filepath}" | |
| except Exception as e: | |
| return f"β File Write Error: {e}" | |
| def eval_math(expr: str) -> str: | |
| try: | |
| return str(eval(expr)) | |
| except Exception as e: | |
| return f"β Math Eval Error: {e}" | |
| def translate(text: str, lang: str = "fr") -> str: | |
| prompt = f"Translate this to {lang}:\n{text}" | |
| return query_llama(prompt) | |
| def summarize(text: str) -> str: | |
| prompt = f"Summarize this:\n{text}" | |
| return query_llama(prompt) | |
| def run_command(cmd: str) -> str: | |
| try: | |
| result = subprocess.run(cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=5) | |
| return result.stdout.decode().strip() | |
| except Exception as e: | |
| return f"β Command Error: {e}" | |
| def search_web(query: str) -> str: | |
| try: | |
| url = f"https://duckduckgo.com/html/?q={query.replace(' ', '+')}" | |
| return f"π Try this DuckDuckGo search:\n{url}" | |
| except Exception as e: | |
| return f"β Web Search Error: {e}" | |
| # === Task Planning === | |
| def plan_task(query: str) -> Tuple[str, Union[str, List[str]]]: | |
| q = query.lower() | |
| if "read file" in q: | |
| return "read_file", query.split()[-1] | |
| elif "write file" in q: | |
| parts = query.split("::") | |
| return "write_file", parts if len(parts) == 2 else [None, None] | |
| elif "calculate" in q or any(op in q for op in "+-*/"): | |
| return "eval_math", query | |
| elif "translate" in q: | |
| return "translate", query | |
| elif "summarize" in q: | |
| return "summarize", query | |
| elif "search" in q: | |
| return "web_search", query | |
| elif "run code" in q or "python" in q: | |
| return "run_code", query | |
| elif "run command" in q: | |
| return "system_command", query.replace("run command", "").strip() | |
| else: | |
| return "llama_prompt", query | |
| # === Main Handler === | |
| def run_agent(prompt: str, temperature: float = 0.7, top_p: float = 0.9, stream: bool = False) -> str: | |
| task, data = plan_task(prompt) | |
| try: | |
| if task == "run_code": | |
| result = exec_python(data) | |
| elif task == "read_file": | |
| result = read_file(data) | |
| elif task == "write_file": | |
| result = write_file(data[0], data[1]) | |
| elif task == "eval_math": | |
| result = eval_math(data) | |
| elif task == "translate": | |
| result = translate(data) | |
| elif task == "summarize": | |
| result = summarize(data) | |
| elif task == "web_search": | |
| result = search_web(data) | |
| elif task == "system_command": | |
| result = run_command(data) | |
| else: | |
| result = query_llama(data, temperature=temperature, top_p=top_p, stream=stream) | |
| except Exception as e: | |
| result = f"β Error during task: {e}" | |
| log_task(prompt, result) | |
| return result | |
| # === CLI === | |
| if __name__ == "__main__": | |
| print("π€ Enhanced LLaMA Agent Ready! (type 'exit' to quit)\n") | |
| while True: | |
| try: | |
| prompt = input("π§ You > ") | |
| if prompt.lower() in {"exit", "quit"}: | |
| break | |
| response = run_agent(prompt, stream=True) | |
| print(f"π LLaMA > {response}\n") | |
| except KeyboardInterrupt: | |
| break | |