Spaces:
Running
Running
| import os ## replace with Path | |
| from pathlib import Path | |
| import glob | |
| import gradio as gr | |
| #from watchfiles import run_process ##gradio reload watch | |
| import numpy as np ##SMY | |
| import random | |
| from functools import partial | |
| from typing import Tuple, Optional, Any, List, Union | |
| from utils.utils import get_time_now_str ##SMY lightrag_openai_compatible_demo.py | |
| def install(package): | |
| import subprocess | |
| subprocess.check_call([os.sys.executable, "-m", "pip", "install", package]) | |
| try: | |
| import pipmaster as pm | |
| except ModuleNotFoundError: ##assist: /static-proxy?url=https%3A%2F%2Fdiscuss.huggingface.co%2Ft%2Fhuggingface-spaces-not-updating-packages-from-requirements-txt%2F92865%2F4%3Fu%3Dsemmyk%3C%2Fspan%3E%3C!-- HTML_TAG_END --> | |
| install("pipmaster") | |
| import pipmaster as pm | |
| if not pm.is_installed("nest_asyncio"): | |
| pm.install("nest_asyncio") #HF Spaces modulenotfounderror: No module named 'nest_asyncio' | |
| if not pm.is_installed("google-genai"): | |
| pm.install("google-genai") ## use gemini as a client: genai "google-genai>=1.40.0" | |
| #if not pm.is_installed("gradio[oauth]==5.29.0"): | |
| # pm.install("gradio[oauth]>=5.29.0") | |
| if not pm.is_installed("pyvis"): | |
| pm.install("pyvis") #"pyvis>=0.3.0" | |
| if not pm.is_installed("networkx"): | |
| pm.install("networkx") #"networkx>=3.4.2" | |
| if not pm.is_installed("sentence-transformers"): | |
| pm.install("sentence-transformers") | |
| if not pm.is_installed("hf_xet"): | |
| pm.install("hf_xet") #HF Xet Storage downloader | |
| import networkx as nx | |
| from pyvis.network import Network | |
| from sentence_transformers import SentenceTransformer | |
| #from google import genai | |
| from google.genai import types, errors, Client | |
| from openai import APIConnectionError, APIStatusError, NotFoundError, APIError, BadRequestError | |
| from lightrag import LightRAG, QueryParam | |
| from lightrag.llm.openai import openai_complete_if_cache, openai_complete, openai_embed, InvalidResponseError | |
| from lightrag.llm.ollama import ollama_embed, ollama_model_complete | |
| from lightrag.utils import EmbeddingFunc, logger, set_verbose_debug ##SMY | |
| from lightrag.kg.shared_storage import initialize_pipeline_status ##SMY | |
| from utils.file_utils import check_create_dir, check_create_file | |
| import asyncio | |
| import nest_asyncio | |
| # Apply nest_asyncio to solve event loop issues: allow nested evennt loops | |
| nest_asyncio.apply() | |
| from dotenv import load_dotenv | |
| # Load environment variables | |
| load_dotenv() | |
| import traceback | |
| import logging, logging.config ##SMY lightrag_openai_compatible_demo.py | |
| from utils.logger import get_logger | |
| logger_kg = get_logger(__name__) | |
| # Pythonic error handling decorator | |
| def handle_errors(func): | |
| def wrapper(*args, **kwargs): | |
| try: | |
| return func(*args, **kwargs) | |
| except Exception as e: | |
| return gr.update(value=f"Error: {e}") | |
| return wrapper | |
| def configure_logging(): | |
| """Configure logging for lightRAG""" | |
| ##SMY lightrag_openai_compatible_demo.py | |
| # Reset any existing handlers to ensure clean configuration | |
| for logger_name in ["uvicorn", "uvicorn.access", "uvicorn.error", "lightrag"]: | |
| logger_instance = logging.getLogger(logger_name) | |
| logger_instance.handlers = [] | |
| logger_instance.filters = [] | |
| # Get log directory path from environment variable or use current directory | |
| #log_dir = os.getenv("LOG_DIR", os.getcwd()) | |
| log_dir = os.getenv("LOG_DIR", "logs") | |
| if log_dir: | |
| log_file_path = Path(log_dir) / "lightrag_logs.log" | |
| else: | |
| log_file_path = Path("logs") / "lightrag_logs.log" | |
| #log_file_path.mkdir(mode=0o2755, parents=True, exist_ok=True) | |
| check_create_file(log_file_path) | |
| #print(f"\nLightRAG log file: {log_file_path}\n") | |
| logger_kg.log(level=20, msg=f"LightRAG logging creation", extra={"LightRAG log file: ": log_file_path.name}) | |
| # Get log file max size and backup count from environment variables | |
| log_max_bytes = int(os.getenv("LOG_MAX_BYTES", 10485760)) # Default 10MB | |
| log_backup_count = int(os.getenv("LOG_BACKUP_COUNT", 5)) # Default 5 backups | |
| logging.config.dictConfig( | |
| { | |
| "version": 1, | |
| "disable_existing_loggers": False, | |
| "formatters": { | |
| "default": { | |
| "format": "%(levelname)s: %(message)s", | |
| }, | |
| "detailed": { | |
| "format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s", | |
| }, | |
| }, | |
| "handlers": { | |
| "console": { | |
| "formatter": "default", | |
| "class": "logging.StreamHandler", | |
| "stream": "ext://sys.stderr", | |
| }, | |
| "file": { | |
| "formatter": "detailed", | |
| "class": "logging.handlers.RotatingFileHandler", | |
| "filename": log_file_path, | |
| "maxBytes": log_max_bytes, | |
| "backupCount": log_backup_count, | |
| "encoding": "utf-8", | |
| }, | |
| }, | |
| "loggers": { | |
| "lightrag": { | |
| "handlers": ["console", "file"], | |
| "level": "INFO", | |
| "propagate": False, | |
| }, | |
| }, | |
| } | |
| ) | |
| # Set the logger level to INFO | |
| logger.setLevel(logging.INFO) | |
| # Enable verbose debug if needed | |
| set_verbose_debug(os.getenv("VERBOSE_DEBUG", "false").lower() == "true") | |
| # Utility: Wrap async functions | |
| ##SMY: temporary dropped for async def declaration | |
| '''def wrap_async(func): | |
| """Wrap an async function to run synchronously using asyncio.run""" | |
| async def _async_wrapper(*args, **kwargs): | |
| result = await func(*args, **kwargs) | |
| return result | |
| return lambda *args, **kwargs: asyncio.run(_async_wrapper(*args, **kwargs))''' | |
| # Utility: Visualise .graphml as HTML using pyvis | |
| def visualise_graphml(graphml_path: str, working_dir: str) -> str: | |
| """Convert GraphML file to interactive HTML visualisation""" | |
| ## graphml_path: defaults to lightRAG's generated graph_chunk_entity_relation.graphml | |
| ## working_dir: lightRAG's working directory set by user | |
| ## Load the GraphML file | |
| G = nx.read_graphml(graphml_path) | |
| ## Dynamically size nodes | |
| # Calculate note attributes for sizing | |
| node_degrees = dict(G.degree()) | |
| # Scale node degrees for better visual differentiation | |
| max_degree = max(node_degrees.values()) | |
| for node, degree in node_degrees.items(): | |
| G.nodes[node]['size'] = 10 + (degree / max_degree) * 80 #40 # scaling | |
| ## Create a Pyvis network | |
| #net = Network(height="100vh", notebook=True) | |
| net = Network(notebook=True, width="100%", height="100vh") #, heading=f"Knowledge Graph Visualisation") #(noteboot=False) height="600px", | |
| # Convert NetworkX graph to Pyvis network | |
| net.from_nx(G) | |
| # Add colors and title to nodes | |
| for node in net.nodes: | |
| #node["color"] = "#{:06x}".format(random.randint(0, 0xFFFFFF)) | |
| node["color"] = "#{:01x}".format(random.randint(0, 0xFFFFFF)) | |
| if "description" in node: | |
| node["title"] = node["description"] | |
| # Add title to edges | |
| for edge in net.edges: | |
| if "description" in edge: | |
| edge["title"] = edge["description"] | |
| ## Set the 'physics' attribute to repulsion | |
| net.repulsion(node_distance=120, spring_length=200) | |
| net.show_buttons(filter_=['physics', 'layout']) ##SMY: dynamically modify the network | |
| #net.show_buttons() | |
| ## graph path | |
| kg_viz_html_file = f"kg_viz_{get_time_now_str(date_format='%Y-%m-%d')}.html" | |
| html_path = Path(working_dir) / kg_viz_html_file | |
| ## Save and display the generated KG network html | |
| #net.save_graph(html_path) | |
| #net.show(html_path) | |
| net.show(str(html_path), local=True, notebook=False) | |
| # get HTML content | |
| html_iframe = net.generate_html(str(html_path), local=True, notebook=False) | |
| ## need to remove ' from HTML ##assist: https://huggingface.co/spaces/simonduerr/pyvisdemo/blob/main/app.py | |
| html_iframe = html_iframe.replace("'", "\"") | |
| ##SMY display generated KG html | |
| #''' | |
| return gr.update(show_label=True, container=True, value=f"""<iframe style="width: 100%; height: 100vh;margin:0 auto" name="result" allow="midi; geolocation; microphone; camera; | |
| display-capture; encrypted-media;" sandbox="allow-modals allow-forms | |
| allow-scripts allow-same-origin allow-popups | |
| allow-top-navigation-by-user-activation allow-downloads" allowfullscreen="" | |
| allowpaymentrequest="" frameborder="0" srcdoc='{html_iframe}'></iframe>""" | |
| ) | |
| #''' | |
| # Utility: Get all markdown files in a folder | |
| def get_markdown_files(folder: str) -> list[str]: | |
| """Get sorted list of markdown files in folder""" | |
| #return sorted(glob.glob(os.path.join(folder, "*.md"))) | |
| #return sorted(Path(folder).glob("*.md")) ## change to Pathlib. SMY: We're not interested in sub-directory, hence not rglob() | |
| #markdown_files = sorted([file for file in Path(folder).glob("*.md")]) | |
| markdown_files_list = sorted(str(file) for file in Path(folder).iterdir() if file.suffix == ".md") | |
| return markdown_files_list | |
| # LightRAG wrapper class | |
| class LightRAGApp: | |
| """LightRAG application wrapper with async support""" | |
| def __init__(self): | |
| """Initialise LightRAG application state""" | |
| self.rag: Optional[LightRAG] = None | |
| self.working_dir: Optional[str] = None | |
| self.llm_backend: Optional[str] = None | |
| self.embed_backend: Optional[str] = None | |
| self.llm_model_name: Optional[str] = None | |
| self.llm_model_embed: Optional[str] = None | |
| self.llm_baseurl: Optional[str] = None | |
| self.system_prompt: Optional[str] = None | |
| self.status: str = "" | |
| self._is_initialised: bool = False ## Add initialisation flag | |
| self.cancel_event = asyncio.Event() ## Add cancel event: long-running tasks | |
| self.delay_between_files: Optional[float]=60.0 ## lightRAG initialisation: Delay in seconds between files processing viz RateLimitError 429 | |
| self.llm_model_max_async: Optional[int] = 2, #4, ##SMY: https://github.com/HKUDS/LightRAG/issues/128 | |
| self.max_parallel_insert: Optional[int] = 1, ## No of parralel files to process in one batch: aasist: https://github.com/HKUDS/LightRAG/issues/1653#issuecomment-2940593112 | |
| self.timeout: Optional[float] = 1000, #AsyncOpenAI #Union[float, Timeout, None, NotGiven] = NOT_GIVEN, | |
| self.max_retries: Optional[int] = 1 #AsyncOpenAI #DEFAULT_MAX_RETRIES, | |
| def _system_prompt(self, custom_system_prompt: Optional[str]=None) -> str: | |
| """Set a localised system prompt""" | |
| ## SMY: TODO: Make modular | |
| #self.system_prompt if custom_system_prompt else self.system_prompt=f"\n | |
| if custom_system_prompt: | |
| self.system_prompt = custom_system_prompt | |
| ''' ## system_prompt now in gradio ui | |
| else: | |
| self.system_prompt = """ | |
| You are a domain expert on Cybersecurity, the South Africa landscape and South African legislation. | |
| 1. You only process text in English. You disregard pages containing other languages or typesetting not English in context. | |
| - South African legislation are written in English or in English and another South African language. | |
| 2. When building knowledge graph, taxonomy and ontology, | |
| - take cognisance of NER (Named Entity Recognition) with localisation and domain-context in mind. | |
| - So, person(s) can be natural or juristic person. Natural person(s) are individuals, while juristic person(s) are organisations. | |
| - For instance, Minister of Justice is juristic. Likewise, Information Regulator (South Africa) is juristic, while Advocate (Adv) Pansy Tlakula is natural. Ditto, Public Protector is juristic | |
| 3. Different natural and juristic person(s) are assigned to perform roles. | |
| 4. In South Africa, there are different entities (organisations or departments) defined in legislations, Acts, Bills and Policy. | |
| - For instance, you might have aDept of Treasury at National (The National Treasury) and at Provincial levels (Provincial Treasuries) guided by the PFMA, while | |
| - Municipalities (local governments), guided by the MFMA, do not have Treasury department, but might have Budget & Treasury Office. | |
| - You have stand alone entities like the Office of the Public Protector, headed by the Public Protector. Ditto, Information Regulator headed by Chairperson of the Information Regulator. | |
| - You have others like the CCMA (Commission for Conciliation, Mediation and Adjudication) that are creation of satutes. | |
| 5. Legislations include Acts, Bill and in some instance, Regulations and Policies. | |
| 6. Legislations often have section heads. The also have section detailing amendments and repeals (if any). | |
| 7. Legislations will indicate the heading in the format Name Act No of YYYY. For instance 'Protection of Information Act No 84, 1982. | |
| - Legislation might have other Act No of YYYY as they are amended. Take cognisance and tightly keep/link to the root legislation. | |
| - For instance for the LRA Act, the root is Labour Relations Act 66 of 1995, while soome of the amendments are Labour Relations Amendment Act 6 of 2014, Labour Relations Amendment Act 8 of 2018 | |
| 8. Legislations will have a Gazette No and Assented date (when the President assent to the legislation) from when it becomes operative: that is ... with effect from or wef dd mmm YYYY. | |
| - Certain part of a legislation might not be operative on the date the legislation is assented to. | |
| 9. Legislation might have paragraph number. Kindly disregard for content purposes but take cognisance for context. | |
| 10. Do not create multiple nodes for legislations, written in different formats. | |
| - For instance, maintain a single node for Protection of Information Act, Protection of Information Act, 1982, Protection of Information Act No 84, 1982. | |
| - However, have a separate node for Protection of Personal Information Act, 2013; as it it a separate legislation. | |
| - Also take note that 'Republic of South Africa' is an offical geo entity while 'South Africa' is a referred to place, although also a geo entity: | |
| - Always watch the context and be careful of lumping them together. | |
| """ | |
| ''' | |
| return self.system_prompt | |
| async def _embedding_func(self, texts: list[str], **kwargs,) -> np.ndarray: | |
| #def _embedding_func(self, texts: list[str], **kwargs,) -> np.ndarray: | |
| """Get embedding function based on backend""" | |
| try: | |
| # Use HF embedding | |
| if self.embed_backend == "Transformer" or self.embed_backend[0] == "Transformer" : | |
| model = SentenceTransformer("nomic-ai/nomic-embed-text-v1.5", trust_remote_code=True) #("all-MiniLM-L6-v2") | |
| embeddings = model.encode(texts, convert_to_numpy=True, show_progress_bar=True) | |
| return embeddings | |
| # Use OpenAI | |
| elif self.llm_backend == "OpenAI": | |
| # Use wrap_async for proper async handling | |
| #return wrap_async(openai_embed)( | |
| return await openai_embed( | |
| texts, | |
| model=self.llm_model_embed, | |
| api_key=self.llm_api_key_embed, | |
| base_url=self.llm_baseurl_embed | |
| #client_configs=None #: dict[str, Any] | None = None, | |
| ) | |
| # Use Ollama | |
| elif self.llm_backend == "Ollama": | |
| #return wrap_async(ollama_embed)( | |
| return await ollama_embed( | |
| texts, | |
| embed_model=self.llm_model_embed, | |
| #host=self.openai_baseurl_embed | |
| host=self.ollama_host, | |
| api_key=self.llm_api_key_embed | |
| ) | |
| except Exception as e: | |
| self.status = f"{self.status} | _embedding_func error: {str(e)}" | |
| logger_kg.log(level=30, msg=f"{self.status} | _embedding_func error: {str(e)}") | |
| raise # Re-raise to be caught by the setup method | |
| async def _get_embedding_dim(self) -> int: | |
| #def _get_embedding_dim(self) -> int: | |
| """Dynamically determine embedding dimension or fallback to defaults""" | |
| try: | |
| test_text = ["This is a test sentence for embedding."] | |
| embedding = await self._embedding_func(test_text) | |
| ##SMY: getting asyncio error with wrap_async | |
| #embedding = wrap_async(self._embedding_func)(test_text) | |
| return embedding.shape[1] | |
| except Exception as e: | |
| self.status = f"_get_embedding_dim error: {str(e)}" | |
| logger_kg.log(level=30, msg=f"_get_embedding_dim error: {str(e)}") | |
| # Fallback to known dimensions | |
| if "bge-m3" in self.llm_model_embed: | |
| return 1024 # BAAI/bge-m3 embedding | |
| if self.llm_backend == "OPENAI" and "gemini" in self.llm_model_name: | |
| return 3072 # Gemini's gemini-embedding-exp-03-07 dimension | |
| if self.llm_backend == "OpenAI": | |
| return 1536 # OpenAI's text-embedding-3-small | |
| return 4096 # Ollama's default | |
| # Call GenAI ##SMY: to do: Follow GenAI or map to ligthRAG's openai_complete() | |
| #async def genai_complete(self, prompt, system_prompt=None, history_messages: Optional[List[types.Content]] = None, **kwargs) -> Union[str, types.Content]: | |
| async def genai_complete(self, model: str, prompt: str, system_prompt: Union[str, None] =None, | |
| history_messages: Union[Optional[List[types.Content]], None] = None, | |
| api_key: Union[str, None] = None, | |
| **kwargs) -> Union[str, types.Content]: | |
| """ Create GenAI client and complete a prompt """ | |
| # https://github.com/googleapis/python-genai/tree/main | |
| # 1. Combine prompts: system prompt, history, and user prompt | |
| if not history_messages or history_messages is None: | |
| history_messages = [] | |
| ## SMY: role mapping: attempting to map assistant with user #role:assistant pydantic error | |
| ''' ##SMY working code: disused. See modify_history_in_place() | |
| ## SMY: new dictionary by unpacking (**) the existing message dictionary, explicitly set "role" key to "user" | |
| history_messages1 = [{**message, 'role': 'user'} if message.get('role', ) == 'assistant' else message | |
| for message in history_messages] | |
| for message in history_messages1: print(f"history len {len(history_messages1)} : \n {'\n'.join(f'{key}: {value[:25]}' for key, value in message.items())}") if isinstance(message, dict) else print(f"\n {str(message)[:25]}") | |
| ''' | |
| ''' | |
| #idiomatic way to handle an in-place mutation within a list comprehension | |
| #setattr(m, 'role', 'user') mutates the object in-place by changing its role attribute | |
| #(..., m)[1] tricks returning the mutated object m after the setattr operation is completed, necessary for the list comprehension to work correctly | |
| history_messages2 = [ | |
| (lambda m: (setattr(m, 'role', 'user'), m)[1] if message.get('role', ) == 'assistant' else m)(message) | |
| for message in history_messages ] | |
| for message in history_messages2: print(f"history len {len(history_messages2)} : \n {'\n'.join(f'{key}: {value[:25]}' for key, value in message.items())}") if isinstance(message, dict) else print(f"\n {str(message)[:25]}") | |
| ''' | |
| ## one-liner to change all 'model' roles to 'user': List comprehensions are not intended for in-place mutations | |
| #[(setattr(message, 'role', 'user'), message) for message in history_messages if hasattr(message, 'role') and message.role == 'assistant'] | |
| def modify_history_in_place(history_messages): | |
| """ | |
| Modifies the history_messages list in-place, converting 'assistant' roles to 'user'. | |
| Args: | |
| history_messages: A list that may contain a mix of dicts and GenAI Content objects. | |
| """ | |
| #history_messages_dict = history_messages #debug | |
| ## enumerating to avoid potential "off-by-one" errors | |
| for index, message in enumerate(history_messages): | |
| # Handle the custom GenAI Content object using its API | |
| if hasattr(message, 'to_dict'): | |
| msg_dict = message.to_dict() | |
| #history_messages[index] = {**msg_dict, 'role': 'user'} if msg_dict.get(key='role', default_value='user') == 'assistant' else msg_dict | |
| ## SMY: {'role': []'user', 'model'], 'content': 'content_text'} | |
| role = 'user' if msg_dict.get('role', 'user') == 'assistant' else msg_dict.get('role', 'user') #msg_dict.get(key='role', default_value='user') | |
| parts = [types.Part.from_text(text=msg_dict.get('content', ''))] | |
| history_messages[index] = types.Content(role=role, parts=parts) | |
| # Handle standard Python dictionaries | |
| elif isinstance(message, dict): | |
| #history_messages[index] = {**message, 'role': 'user'} if message.get('role') == 'assistant' else message | |
| role = 'user' if message.get('role', 'user') == 'assistant' else message.get('role', 'user') | |
| parts = [types.Part.from_text(text=message.get('content', ''))] | |
| history_messages[index] = types.Content(role=role, parts=parts) | |
| ##debug | |
| #for message in history_messages: print(f"history len {len(history_messages)} : \n {'\n'.join(f'{key}: {value[:50]}' for key, value in message.items())}") if isinstance(message, dict) else print(f"\n {str(message)[:50]}") | |
| modify_history_in_place(history_messages) | |
| # prepare message | |
| messages: list[types.Content] = [] | |
| if system_prompt: ##See system_instruction | |
| history_messages.append(types.Content(role="user", parts=[types.Part.from_text(text=system_prompt)])) | |
| new_user_content = types.Content(role="user", parts=[types.Part.from_text(text=prompt)]) | |
| history_messages.append(new_user_content) | |
| logger.debug(f"Sending messages to Gemini: Model: {self.llm_model_name.rpartition('/')[-1]} \n~ Message: {prompt}") | |
| logger_kg.log(level=20, msg=f"Sending messages to Gemini: Model: {self.llm_model_name.rpartition('/')[-1]} \n~ Message: {prompt}") | |
| # 2. Initialise the GenAI Client with Gemini API Key | |
| client = Client(api_key=self.llm_api_key) #api_key=gemini_api_key | |
| #aclient = genai.Client(api_key=self.llm_api_key).aio # use AsyncClient | |
| # 3. Call the Gemini model. Don't use async with context manager, use client directly. | |
| try: | |
| response = client.models.generate_content( | |
| #response = await aclient.models.generate_content( | |
| model = self.llm_model_name.rpartition("/")[-1] if self.llm_model_name else "gemini-2.0-flash-exp:free", #"gemini-2.0-flash", | |
| #contents = [combined_prompt], | |
| contents = history_messages, #messages, | |
| config = types.GenerateContentConfig( | |
| #max_output_tokens=5000, | |
| temperature=0, top_k=10, top_p=0.1, | |
| thinking_config=types.ThinkingConfig(thinking_budget=0), # Disables thinking | |
| #automatic_function_calling=types.AutomaticFunctionCallingConfig(disable=False), | |
| system_instruction=["You are an expert in Knowledge graph.", | |
| "You are well versed in entities, relations, objects and ontology reasoning", | |
| "Your mission/task is to create/construct knowledge Graph, otherwise, query the Knowledge Graph when instructed"], #system_prompt, | |
| ) | |
| ) | |
| ## GenAI keeps giving pydantic error relating to 'role': 'assistant' #wierd | |
| ## SMY: suspect is lightRAG prompts' examples. | |
| logger_kg.log(level=30, msg=f"GenAI response: \n ", extra={"Model": response.text}) | |
| #return response.text | |
| except errors.APIError as e: | |
| logger.error(f"GenAI API error: code: {e} ~ Status: {e.status}") | |
| logger_kg.log(level=30, msg=f"Gen API Call Failed,\nModel: {self.llm_model_name}\nGot: code: {e} ~ Status: {e.status}") | |
| #client.close() # Ensure client is closed #Err in 1.43.0 | |
| #await aclient.close() # .aclose() | |
| raise | |
| except Exception as e: | |
| logger.error( | |
| f"GenAI API Call Failed,\nModel: {self.llm_model_name}\nGot: code: {e} ~ Traceback: {traceback.format_exc()}" | |
| ) | |
| logger_kg.log(level=30, msg=f"GenAI API Call Failed,\nModel: {self.llm_model_name}\nGot: code: {e} ~ Traceback: {traceback.format_exc()}") | |
| #client.close() # Ensure client is closed #Err in 1.43.0 | |
| #await aclient.close() # .aclose() | |
| raise | |
| # 4. Return the response text | |
| return response.text | |
| #def _llm_model_func(self, prompt, system_prompt=None, history_messages=[], keyword_extraction=False, | |
| async def _llm_model_func(self, prompt, system_prompt=None, history_messages=[], keyword_extraction=False, **kwargs) -> str: | |
| """Complete a prompt using OpenAI's API with or without caching support.""" | |
| try: | |
| ## SMY: TODO: Revisit to make modular: tie-in with Gradio UI | |
| if not system_prompt: | |
| system_prompt = self._system_prompt() | |
| except Exception as e: | |
| self.status = f"_llm_model_func: Error while setting system_promt: {str(e)}" | |
| logger_kg.log(level=30, msg=f"_llm_model_func: Error while setting system_promt: {str(e)}") | |
| raise | |
| self.status = f"{self.status}\n _llm_model_func: calling LLM to process ... with {self.llm_backend}" | |
| logger_kg.log(level=20, msg=f"{self.status}\n _llm_model_func: calling LLM to process ... with {self.llm_backend}") | |
| try: | |
| await asyncio.sleep(self.delay_between_files/6) # Pause between file processing #10s | |
| if self.llm_backend == "GenAI": | |
| return await self.genai_complete( | |
| model=self.llm_model_name, | |
| prompt=prompt, | |
| system_prompt=system_prompt, | |
| history_messages=history_messages, | |
| #base_url=self.llm_baseurl, | |
| api_key=self.llm_api_key, | |
| **kwargs | |
| ) | |
| #elif self.llm_backend == "OpenAI": | |
| else: | |
| #return openai_complete_if_cache( | |
| return await openai_complete_if_cache( | |
| model=self.llm_model_name.rpartition('/')[-1] if "googleapi" in self.llm_baseurl else self.llm_model_name, #"gemini" in self.llm_model_name else self.llm_model_name, | |
| prompt=prompt, | |
| system_prompt=system_prompt, | |
| history_messages=history_messages, | |
| base_url=self.llm_baseurl, | |
| api_key=self.llm_api_key, | |
| #timeout=self.timeout, #: Union[float, Timeout, None, NotGiven] = NOT_GIVEN, | |
| #max_retries=self.max_retries, #: int = DEFAULT_MAX_RETRIES, | |
| **kwargs, | |
| ) | |
| except Exception as e: | |
| self.status = f"_llm_model_func: Error while initialising model: {str(e)}" | |
| logger_kg.log(level=30, msg=f"_llm_model_func: Error while initialising model: {str(e)}") | |
| raise | |
| def _ensure_working_dir(self) -> str: | |
| """Ensure working directory exists and return status message""" | |
| if not Path(self.working_dir).exists(): | |
| check_create_dir(self.working_dir) | |
| return f"Created working directory: {self.working_dir}" | |
| return f"Working directory exists: {self.working_dir}" | |
| ##SMY: //TODO: Gradio toggle button | |
| async def _clear_old_data_files(self): | |
| """Clear old data files""" | |
| files_to_delete = [ | |
| "graph_chunk_entity_relation.graphml", | |
| "kv_store_doc_status.json", | |
| "kv_store_full_docs.json", | |
| "kv_store_text_chunks.json", | |
| "vdb_chunks.json", | |
| "vdb_entities.json", | |
| "vdb_relationships.json", | |
| ] | |
| for file in files_to_delete: | |
| file_path = Path(self.working_dir) / file | |
| if file_path.exists(): | |
| file_path.unlink() | |
| logger_kg.log(level=20, msg=f"LightRAG class: Deleting old files", extra={"filepath": file_path.name}) | |
| async def _initialise_storages(self) -> str: | |
| #def _initialise_storages(self) -> str: | |
| """Initialise LightRAG storages and pipeline""" | |
| try: | |
| #wrap_async(self.rag.initialize_storages) | |
| #wrap_async(initialize_pipeline_status) | |
| await self.rag.initialize_storages() | |
| await initialize_pipeline_status() | |
| return "Storages and pipeline initialised successfully" | |
| except Exception as e: | |
| return f"Storage initialisation failed: {str(e)}" | |
| ##SMY: | |
| async def _initialise_rag(self): | |
| """Initialise lightRAG""" | |
| ##debug | |
| # ## getting embedidngs dynamically | |
| #self.status = f"Getting embeddings dynamically" | |
| #print(f"Getting embeddings dynamically") | |
| #print(f"_embedding_func: llm_model_embed: {self.llm_model_embed}") | |
| #print(f"_embedding_func: llm_api_key_embed: {self.llm_api_key_embed}") | |
| #print(f"_embedding_func: llm_baseurl_embed: {self.llm_baseurl_embed}") | |
| if self.working_dir_reset: | |
| # Clear old data files | |
| await self._clear_old_data_files() | |
| # Get embedding | |
| if self.embed_backend == "Transformer" or self.embed_backend[0] == "Transformer": | |
| logger_kg.log(level=20, msg=f"Getting embeddings dynamically through _embedding_func: ", | |
| extra={"embedding backend": self.embed_backend, }) | |
| else: | |
| logger_kg.log(level=20, msg=f"Getting embeddings dynamically through _embedding_func: ", extra={ | |
| "embedding backend": self.embed_backend, | |
| "llm_model_embed": self.llm_model_embed, | |
| "llm_api_key_embed": self.llm_api_key_embed, | |
| "llm_baseurl_embed": self.llm_baseurl_embed, | |
| }) | |
| #embedding_dimension = wrap_async(self._get_embedding_dim) | |
| embedding_dimension = await self._get_embedding_dim() | |
| #print(f"Detected embedding dimension: {embedding_dimension}") | |
| logger_kg.log(level=20, msg=f"Detected embedding dimension: ", extra={"embedding_dimension": embedding_dimension, "embedding_type": self.embed_backend}) | |
| try: | |
| rag = LightRAG( | |
| working_dir=self.working_dir, | |
| #llm_model_max_async=self.llm_model_max_async, #getting tuple instead of int #1, #opting for lightRAG default | |
| #max_parallel_insert=self.max_parallel_insert, #getting tuple instead of int #1, #opting for lightRAG default | |
| llm_model_name=self.llm_model_name.rpartition("/")[-1], #self.llm_model_name, | |
| llm_model_func=self._llm_model_func, | |
| embedding_func=EmbeddingFunc( | |
| embedding_dim=embedding_dimension, | |
| max_token_size=int(os.getenv("MAX_EMBED_TOKENS", "8192")), #8192, | |
| func=self._embedding_func, | |
| ), | |
| ) | |
| self.rag = rag | |
| # Initialise RAG instance | |
| #wrap_async(self._initialise_storages) | |
| await self._initialise_storages() | |
| #await rag.initialize_storages() | |
| #await initialize_pipeline_status() ##SMY: still relevant in updated lightRAG? - """Asynchronously finalise the storages""" | |
| self.status = f"Storages and pipeline initialised successfully" ##SMY: debug | |
| logger_kg.log(level=20, msg=f"Storages and pipeline initialised successfully") | |
| return self.rag #return rag | |
| except Exception as e: | |
| tb = traceback.print_exc() | |
| return f"lightRAG initialisation failed: {str(e)} \n traceback: {tb}" | |
| #raise RuntimeWarning(f"lightRAG initialisation failed: {str(e.with_traceback())}") | |
| #def setup(self, data_folder: str, working_dir: str, llm_backend: str, | |
| async def setup(self, data_folder: str, working_dir: str, wdir_reset: bool, llm_backend: str, embed_backend: str, | |
| openai_key: str, openai_baseurl: str, openai_baseurl_embed: str, llm_model_name: str, | |
| llm_model_embed: str, ollama_host: str, embed_key: str, system_prompt: str) -> str: | |
| """Set up LightRAG with specified configuration""" | |
| # Configure environment | |
| #os.environ["OPENAI_API_KEY"] = openai_key or os.getenv("OPENAI_API_KEY", "") | |
| ##os.environ["OLLAMA_HOST"] = ollama_host or os.getenv("OLLAMA_HOST", "http://localhost:11434") | |
| #os.environ["OLLAMA_API_BASE"] = os.getenv("OLLAMA_API_BASE") #, "http://localhost:1337/v1/chat/completions") | |
| ##os.environ["OPENAI_API_BASE"] = openai_baseurl or os.getenv("OPENAI_API_BASE", "https://openrouter.ai/api/v1") | |
| #os.environ["OPENAI_API_EMBED_BASE"] = openai_baseurl_embed or os.getenv("OPENAI_API_EMBED_BASE") #, "http://localhost:1234/v1/embeddings") | |
| # Update instance state | |
| self.data_folder = data_folder ##SMY: redundant | |
| self.working_dir = working_dir | |
| self.working_dir_reset = wdir_reset | |
| self.llm_backend = llm_backend | |
| self.embed_backend = embed_backend if isinstance(embed_backend, str) else embed_backend[0], | |
| self.llm_model_name = llm_model_name | |
| self.llm_model_embed = llm_model_embed | |
| self.llm_baseurl = openai_baseurl | |
| self.llm_baseurl_embed = openai_baseurl_embed | |
| self.llm_api_key = openai_key | |
| self.ollama_host = ollama_host | |
| self.llm_api_key_embed = embed_key | |
| try: | |
| ## ensure working folder exists and send status | |
| try: | |
| self.status = self._ensure_working_dir() | |
| except Exception as e: | |
| self.status = f"LightRAG initialisation.setup: working dir err | {str(e)}" | |
| # Initialise lightRAG with storages | |
| try: | |
| #self.rag = wrap_async( self._initialise_rag) | |
| self.rag = await self._initialise_rag() | |
| self.status = f"{self.status}\n{self.rag}" | |
| # set LightRAG class initialised flag | |
| self._is_initialised = True | |
| self.status = f"{self.status}\n Initialised LightRAG with {llm_backend} backend" | |
| logger_kg.log(level=20, msg=f"{self.status}\n Initialised LightRAG with {llm_backend} backend" ) | |
| except Exception as e: | |
| tb = traceback.print_exc() | |
| self.status = f"{self.status}\n LightRAG initialisation.setup and storage failed | {str(e)}" | |
| logger_kg.log(level=30, msg=f"{self.status}\n LightRAG initialisation.setup and storage failed | {str(e)} \n traceback: {tb}") | |
| except Exception as e: | |
| self._is_initialised = False | |
| tb = traceback.format_exc() | |
| self.status = (f"LightRAG initialisation failed: {str(e)}\n" | |
| f"LightRAG with {working_dir} and {llm_backend} not initialised") | |
| logger_kg.log(level=30, msg=f"LightRAG with {working_dir} and {llm_backend} not initialised" | |
| f"LightRAG initialisation failed: {str(e)}\n{tb}") | |
| return self.status | |
| return self.status | |
| ''' ##SMY: disabled to follow lightRAG documentations | |
| @handle_errors | |
| #def setup(self, data_folder: str, working_dir: str, llm_backend: str, | |
| async def setup(self, data_folder: str, working_dir: str, llm_backend: str, | |
| openai_key: str, llm_baseurl: str, llm_model_name: str, | |
| llm_model_embed: str) -> str: | |
| """Set up LightRAG with specified configuration""" | |
| ''' | |
| async def index_documents(self, data_folder: Union[list[str], str]) -> Tuple[str, str]: | |
| #def index_documents(self, data_folder: str) -> Tuple[str, str]: | |
| """Index markdown documents with progress tracking""" | |
| if not self._is_initialised or self.rag is None: | |
| return "Please initialise LightRAG first using the 'Initialise App' button.", "Not started" | |
| #md_files = get_markdown_files(data_folder) #data_folder is now list of ploaded files | |
| #if not md_files: | |
| # return f"No markdown files found in {data_folder}:", "No files" | |
| md_files = data_folder | |
| if not md_files: | |
| return f"No markdown files uploaded {data_folder}:", "No files" | |
| try: | |
| total_files = len(md_files) | |
| #self.status = f"Starting to index {total_files} files..." | |
| status_msg = f"Starting to index {total_files} files" | |
| progress_msg = f"Found {total_files} files to index" | |
| self.reset_cancel() ## Add <-- Reset at the start of each operation. ##TODO: ditto for query | |
| ##SMY: opted for enumerated lightRAG ainsert to handle LLM RateLimitError 429 | |
| for idx, md_file in enumerate(md_files, 1): | |
| ## cancel indexing | |
| if self.cancel_event.is_set(): | |
| self.status = "Indexing cancelled by user." | |
| logger_kg.log(level=20, msg=f"{self.status}") | |
| return self.status, "Cancelled" | |
| else: | |
| #delay_between_files: float=60.0 ## Delay in seconds between files processing viz RateLimitError 429 | |
| try: | |
| with open(md_file, "r", encoding="utf-8") as f: | |
| text = f.read() | |
| ##SMY: 15Oct25. Prefix text with 'Search_document' to aid embedding indexing | |
| text = "Search_document" + text | |
| #status_msg = f"Indexing file {idx}/{total_files}: {os.path.basename(md_file)}" | |
| #progress_msg = f"Processing {idx}/{total_files}: {os.path.basename(md_file)}" | |
| status_msg = f"Indexing file {idx}/{total_files}: {Path(md_file).name}" | |
| progress_msg = f"Processing {idx}/{total_files}: {Path(md_file).name}" | |
| # Use wrap_async for proper async handling | |
| ###wrap_async(self.rag.)(text, file_paths=md_file) | |
| await self.rag.ainsert(text, file_paths=md_file) ##SMY: TODO [12Oct25]: Err: "object of type 'WindowsPath' has no len()" | |
| #wrap_async(self.rag.ainsert)(input=text, filepaths=md_file) | |
| await asyncio.sleep(self.delay_between_files) # Pause between file processing | |
| status_msg = f"{self.status}\n Successfully indexed {total_files} markdown files." | |
| progress_msg = f"{self.status}\n Completed: {total_files} files indexed" | |
| logger_kg.log(level=20, msg=f"{self.status}\n Successfully indexed {total_files} markdown files.") | |
| #''' ##SMY: flagged: to delete | |
| except (NotFoundError, InvalidResponseError, APIError, APIStatusError, APIConnectionError, BadRequestError): ##limit_async | |
| # Get model name excluding the model provider | |
| self.rag.llm_model_name = self.llm_model_name.rpartition("/")[-1] | |
| status_msg = f"Retrying indexing file {idx}/{total_files}: {Path(md_file).name}" | |
| progress_msg = f"Retrying processing {idx}/{total_files}: {Path(md_file).name}" | |
| # Use wrap_async for proper async handling | |
| ###wrap_async(self.rag.)(text, file_paths=md_file) | |
| await self.rag.ainsert(text, file_paths=md_file) ##SMY: TODO [12Oct25]: Err: "object of type 'WindowsPath' has no len()" | |
| #wrap_async(self.rag.ainsert)(input=text, filepaths=md_file) | |
| await asyncio.sleep(self.delay_between_files) # Pause between file processing | |
| #''' | |
| except Exception as e: | |
| tb = traceback.print_exc() | |
| #self.status = f"Error indexing {os.path.basename(md_file)}: {str(e)}" | |
| status_msg = f"Error indexing {Path(md_file).name}: {str(e)}" | |
| progress_msg = f"Failed on {idx}/{total_files}: {Path(md_file).name}" | |
| logger_kg.log(level=30, msg=f"Error indexing: Failed on {idx}/{total_files}: {Path(md_file).name} - {str(e)} \n traceback: {tb}") | |
| continue | |
| await asyncio.sleep(1) #(0) ## Add Yield to event loop | |
| except Exception as e: | |
| tb = traceback.print_exc() | |
| status_msg = f"{self.status}\n Indexing failed: {str(e)}" | |
| progress_msg = "{self.status}\n Indexing failed" | |
| logger_kg.log(level=30, msg=f"{self.status}\n Indexing failed: {str(e)} \n traceback: {tb}") | |
| '''status_msg = f"{self.status}\n Successfully indexed {total_files} markdown files." | |
| progress_msg = f"{self.status}\n Completed: {total_files} files indexed" | |
| logger_kg.log(level=20, msg=f"{self.status}\n Successfully indexed {total_files} markdown files.")''' | |
| return status_msg, progress_msg | |
| async def query(self, query_text: str, mode: str) -> str: | |
| #def query(self, query_text: str, mode: str) -> str: | |
| """Query LightRAG with specified mode""" | |
| if not self._is_initialised or self.rag is None: | |
| return (f"Please initialise LightRAG first using the 'Initialise App' button. \n" | |
| f" and index with 'Index Documents' button") | |
| param = QueryParam(mode=mode) | |
| ## return lightRAG query answer | |
| # Use wrap_async for proper async handling | |
| ###return await wrap_async(self.rag.aquery)(query_text, param=param) | |
| #return wrap_async(self.rag.aquery)(query_text, param=param) | |
| return await self.rag.aquery(query_text, param=param) ##SMY: | |
| #####Err | |
| ##return lambda *args, **kwargs: asyncio.run(_async_wrapper(*args, **kwargs)) | |
| ##File "C:\Dat\dev\Python\Python312\Lib\asyncio\runners.py", line 190, in run | |
| ##raise RuntimeError( | |
| ##RuntimeError: asyncio.run() cannot be called from a running event loop | |
| def show_kg(self) -> str: | |
| """Display knowledge graph visualisation""" | |
| ## graphml_path: defaults to lightRAG's generated graph_chunk_entity_relation.graphml | |
| ## working_dir: lightRAG's working directory set by user | |
| graphml_path = Path(self.working_dir) / "graph_chunk_entity_relation.graphml" | |
| if not Path(graphml_path).exists(): | |
| return "Knowledge graph file not found. Please index documents first to generate Knowledge Graph." | |
| #return visualise_graphml(graphml_path) | |
| return visualise_graphml(graphml_path, self.working_dir) | |
| def reset_cancel(self): | |
| """Reset cancel event""" | |
| self.cancel_event.clear() | |
| def trigger_cancel(self): | |
| """Set cancel event""" | |
| self.cancel_event.set() | |
| ############ | |
| ''' | |
| ##SMY: record only. for deletion | |
| # Wrap with EmbeddingFunc to provide required attributes | |
| embed_func = EmbeddingFunc( | |
| #embedding_dim=1536, # OpenAI's text-embedding-3-small dimension | |
| #max_token_size=8192, # OpenAI's max token size | |
| embedding_dim=3072, # Gemini's gemini-embedding-exp-03-07 dimension | |
| max_token_size=8000, # Gemini's embedding max token size = 20000 | |
| func=embedding_func | |
| ) | |
| ''' | |
| # Instantiate app logic | |
| #app_logic = LightRAGApp() ##SMY: already instantiated in app.main() | |
| # Gradio UI ## moved to app.py | |
| #def gradio_ui(): | |
| # ... | |
| # return gradio_ui | |
| #if __name__ == "__main__": | |
| #gradio_ui().launch() | |
| # ... | |