from logger.custom_logger import CustomLoggerTracker from dotenv import load_dotenv from docs_utils import * from audio_utils import transcribe_audio from pipeQuery import process_query import os import time from typing import Dict, List, Tuple, Optional, Any from configs import load_yaml_config ## loading config yaml config = load_yaml_config("config.yaml") # Load .env early load_dotenv() # --------------------------- # Custom Logger Initialization # --------------------------- custom_log = CustomLoggerTracker() logger = custom_log.get_logger("utils") logger.info("Logger initialized for utils/functions module") # --------------------------- # Environment & Globals # --------------------------- env = os.getenv("ENVIRONMENT", "production") SESSION_ID = "default" pending_clarifications: Dict[str, str] = {} SILICONFLOW_API_KEY = os.getenv("SILICONFLOW_API_KEY", "") SILICONFLOW_URL = os.getenv("SILICONFLOW_URL", "").strip() SILICONFLOW_CHAT_URL = os.getenv( "SILICONFLOW_CHAT_URL", "https://api.siliconflow.com/v1/chat/completions").strip() # Document types mapping for validation VALID_DOC_TYPES = { "Knowledge Document": "knowledge", "User-Specific Document": "user_specific", "Old Document": "old", "New Document": "new", "None": None } if not SILICONFLOW_API_KEY: logger.warning("SILICONFLOW_API_KEY is not set. LLM/Reranker calls may fail.") if not SILICONFLOW_URL: logger.warning("SILICONFLOW_URL is not set. OpenAI client base_url will not work.") # --------------------------- # Utility Functions - Refactored # --------------------------- def validate_document_type(doc_type: str) -> bool: return doc_type in VALID_DOC_TYPES def get_upload_directory() -> str: upload_dir = os.path.join(os.path.dirname(__file__), "uploaded_docs") os.makedirs(upload_dir, exist_ok=True) return upload_dir def save_uploaded_file(doc_file: Any, filename: Optional[str] = None) -> str: if doc_file is None: raise ValueError("Document file cannot be None") # Get safe filename if filename: safe_filename = os.path.basename(filename) else: safe_filename = os.path.basename(getattr(doc_file, 'name', 'unknown_file')) if not safe_filename or safe_filename == 'unknown_file': safe_filename = f"document_{int(time.time())}" upload_dir = get_upload_directory() save_path = os.path.join(upload_dir, safe_filename) logger.info(f"Saving document to: {save_path}") try: # Handle file-like objects vs path strings if hasattr(doc_file, 'read'): file_bytes = doc_file.read() else: with open(str(doc_file), 'rb') as f: file_bytes = f.read() # Validate file content if not file_bytes: raise ValueError("File appears to be empty") with open(save_path, "wb") as f: f.write(file_bytes) logger.info(f"Successfully saved uploaded file to {save_path}") return save_path except Exception as e: logger.error(f"Error saving file: {e}") raise IOError(f"Failed to save file: {e}") def process_document_by_type(query: str, save_path: str, doc_type: str) -> str: if not validate_document_type(doc_type): raise ValueError(f"Invalid document type: {doc_type}") try: if doc_type == "Knowledge Document": logger.info("Processing as Knowledge Document") status = rag_dom_ingest(save_path) answer = rag_dom_qa(query) return f"[Knowledge Document Uploaded]\n{status}\n\n{answer}" elif doc_type == "User-Specific Document": logger.info("Processing as User-Specific Document") status = user_doc_ingest(save_path) answer = user_doc_qa(query) return f"[User-Specific Document Uploaded]\n{status}\n\n{answer}" elif doc_type == "Old Document": logger.info("Processing as Old Document") status = old_doc_ingestion(save_path) answer = old_doc_qa(query) return f"[Old Document Uploaded]\n{status}\n\n{answer}" elif doc_type == "New Document": logger.info("Processing as New Document") status = user_doc_ingest(save_path) answer = user_doc_qa(query) return f"[New Document Uploaded]\n{status}\n\n{answer}" else: raise ValueError(f"Unsupported document type: {doc_type}") except Exception as e: logger.error(f"Error processing document type {doc_type}: {e}") raise def validate_query(query: str) -> bool: """Validate user query""" return query and query.strip() and len(query.strip()) > 0 # --------------------------- # Public Interfaces # --------------------------- def main_pipeline_interface(query: str) -> str: if not validate_query(query): raise ValueError("Query cannot be empty") logger.info("Main pipeline interface called") try: return process_query(query, first_turn=True) except Exception as e: logger.error(f"Error in main pipeline: {e}") raise def main_pipeline_with_doc_and_history( query: str, doc_file: Any, doc_type: str, history: str ) -> Tuple[str, str]: if not validate_query(query): return "Please provide a valid query.", history logger.info("Pipeline with doc and history called") try: response = main_pipeline_with_doc(query, doc_file, doc_type) updated_history = f"{history}\nUser: {query}\nWisal: {response}\n" return response, updated_history except Exception as e: logger.error(f"Error in pipeline with doc and history: {e}") error_response = f"Sorry, I encountered an error: {str(e)}" updated_history = f"{history}\nUser: {query}\nWisal: {error_response}\n" return error_response, updated_history def main_pipeline_with_doc(query: str, doc_file: Any, doc_type: str) -> str: if not validate_query(query): return "Please provide a valid query." logger.info(f"Pipeline with doc called - doc_type: {doc_type}") # If no document, use main pipeline if doc_file is None or doc_type == "None": logger.info("No document provided, using main pipeline") try: return process_query(query, first_turn=True) except Exception as e: logger.error(f"Error in main pipeline: {e}") return f"Sorry, I encountered an error processing your query: {str(e)}" # Validate document type if not validate_document_type(doc_type): logger.warning(f"Invalid document type: {doc_type}") return f"Invalid document type: {doc_type}. Valid types are: {', '.join(VALID_DOC_TYPES.keys())}" try: save_path = save_uploaded_file(doc_file) return process_document_by_type(query, save_path, doc_type) except Exception as e: logger.error(f"Error in document processing: {e}") return f"Sorry, I encountered an error processing your document: {str(e)}" def pipeline_with_history( message: str, doc_file: Any, doc_type: str, history: List[List[str]] ) -> Tuple[List[List[str]], str]: logger.info("Pipeline with history called") history = history or [] if not validate_query(message): logger.warning("Empty message received") error_msg = "Please provide a valid message." history.append([message or "", error_msg]) return history, "" try: response = main_pipeline_with_doc(message, doc_file, doc_type) history.append([message, response]) logger.info("Successfully processed message with history") return history, "" except Exception as e: logger.error(f"Error in pipeline with history: {e}") error_response = f"Sorry, I encountered an error: {str(e)}" history.append([message, error_response]) return history, "" def unified_handler( user_text: Optional[str], audio_file: Any, chat_history: List[Tuple[str, str]] ) -> Tuple[List[Tuple[str, str]], str, Any]: logger.info("Unified handler called") chat_history = chat_history or [] msg_from_user = None if validate_query(user_text): msg_from_user = user_text logger.info("Processing text input") elif audio_file: logger.info("Processing audio input") try: transcription_gen = transcribe_audio(audio_file) last_out = "" for out in transcription_gen: if isinstance(out, str) and out.startswith("[ERROR]"): chat_history.append(("System", out)) return chat_history, "", None elif isinstance(out, str) and not out.startswith("Status:"): last_out = out if validate_query(last_out): msg_from_user = last_out logger.info("Successfully transcribed audio") else: chat_history.append(("System", "Could not transcribe audio properly")) return chat_history, "", None except Exception as e: logger.error(f"Error processing audio: {e}") chat_history.append(("System", f"Audio processing error: {str(e)}")) return chat_history, "", None if msg_from_user: try: logger.info(f"Processing message: {msg_from_user[:50]}...") chat_history.append(("User", msg_from_user)) wisal_reply = process_query(msg_from_user) chat_history.append(("Wisal", wisal_reply)) logger.info("Successfully processed message in unified handler") return chat_history, "", None except Exception as e: logger.error(f"Error processing query: {e}") chat_history.append(("System", f"Processing error: {str(e)}")) return chat_history, "", None logger.warning("No valid input received in unified handler") chat_history.append(("System", "Please provide either text or audio input")) return chat_history, "", None def wisal_handler( user_text: Optional[str], audio_file: Any, chat_history: List[Tuple[str, str]] ) -> Tuple[List[Tuple[str, str]], str, Any]: logger.info("Wisal handler called") chat_history = chat_history or [] if validate_query(user_text): logger.info("Processing text input in Wisal handler") try: response = process_query(user_text) chat_history.append(("User", user_text)) chat_history.append(("Wisal", response)) return chat_history, "", None except Exception as e: logger.error(f"Error processing text in Wisal handler: {e}") chat_history.append(("User", user_text)) chat_history.append(("System", f"Processing error: {str(e)}")) return chat_history, "", None if audio_file: logger.info("Processing audio input in Wisal handler") try: transcription = None for out in transcribe_audio(audio_file): if isinstance(out, str) and out.startswith("[ERROR]"): chat_history.append(("System", out)) return chat_history, "", None if isinstance(out, str) and not out.startswith("Status:"): transcription = out if validate_query(transcription): logger.info("Successfully transcribed audio") chat_history.append(("User", transcription)) wisal_reply = process_query(transcription) chat_history.append(("Wisal", wisal_reply)) return chat_history, "", None else: chat_history.append(("System", "Could not transcribe audio properly")) return chat_history, "", None except Exception as e: logger.error(f"Error processing audio in Wisal handler: {e}") chat_history.append(("System", f"Audio processing error: {str(e)}")) return chat_history, "", None logger.warning("No valid input received in Wisal handler") chat_history.append(("System", "Please provide either text or audio input")) return chat_history, "", None if __name__=="__main__": # Test file paths pdf_test = "tests/Computational Requirements for Embed.pdf" docs_test = "tests/Computational Requirements for Embed.docx" txt_test = "assets/RAG_Documents/Autism_Books_1.txt" print(f"=" * 70) print("COMPREHENSIVE UTILS/FUNCTIONS TEST SUITE") print(f"=" * 70) # =========================== # Test 1: Utility Functions # =========================== print(f"\n{'=' * 50}") print("TEST 1: UTILITY FUNCTIONS") print(f"{'=' * 50}") # Test document type validation print("Testing document type validation...") valid_types = ["Knowledge Document", "User-Specific Document", "Old Document", "New Document", "None"] invalid_types = ["Random Document", "Invalid Type", "", None] for doc_type in valid_types: result = validate_document_type(doc_type) print(f"✓ Valid type '{doc_type}': {result}") for doc_type in invalid_types: result = validate_document_type(doc_type) print(f"✗ Invalid type '{doc_type}': {result}") # Test upload directory creation print("\nTesting upload directory creation...") try: upload_dir = get_upload_directory() if os.path.exists(upload_dir): print(f"✓ Upload directory created/exists: {upload_dir}") else: print(f"✗ Upload directory not found: {upload_dir}") except Exception as e: print(f"✗ Upload directory test failed: {e}") # Test query validation print("\nTesting query validation...") valid_queries = ["What is autism?", "Help me understand treatments", "a"] invalid_queries = ["", " ", None] for query in valid_queries: result = validate_query(query) print(f"✓ Valid query '{query}': {result}") for query in invalid_queries: result = validate_query(query) print(f"✗ Invalid query '{query}': {result}") # =========================== # Test 2: File Upload and Saving with Real Files # =========================== print(f"\n{'=' * 50}") print("TEST 2: FILE UPLOAD AND SAVING WITH REAL FILES") print(f"{'=' * 50}") test_files = [ (pdf_test, "PDF"), (docs_test, "DOCX"), (txt_test, "TXT") ] for file_path, file_type in test_files: print(f"\nTesting {file_type} file upload: {os.path.basename(file_path)}") if os.path.exists(file_path): try: save_path = save_uploaded_file(file_path, f"test_{file_type.lower()}.{file_type.lower()}") if os.path.exists(save_path): print(f"✓ {file_type} file saved successfully: {save_path}") # Check file size original_size = os.path.getsize(file_path) saved_size = os.path.getsize(save_path) if original_size == saved_size: print(f"✓ {file_type} file size matches: {saved_size} bytes") else: print(f"✗ {file_type} file size mismatch: {original_size} vs {saved_size}") else: print(f"✗ {file_type} file not found after saving") except Exception as e: print(f"✗ {file_type} file upload failed: {e}") else: print(f"✗ {file_type} test file not found: {file_path}") # Test error handling for invalid files print("\nTesting error handling for invalid files...") try: save_uploaded_file(None) print("✗ Should have failed with None file") except ValueError as e: print(f"✓ Correctly handled None file: {e}") except Exception as e: print(f"✓ Handled error: {e}") # =========================== # Test 3: Document Processing by Type with Real Files # =========================== print(f"\n{'=' * 50}") print("TEST 3: DOCUMENT PROCESSING BY TYPE WITH REAL FILES") print(f"{'=' * 50}") test_query = "What does this document say about computational requirements?" # Test with text file if os.path.exists(txt_test): print(f"Testing document processing with: {os.path.basename(txt_test)}") for doc_type in ["Knowledge Document", "User-Specific Document", "Old Document", "New Document"]: print(f"\nTesting {doc_type} processing...") try: result = process_document_by_type(test_query, txt_test, doc_type) print(f"✓ {doc_type} processed successfully") print(f" Response preview: {result[:150]}...") except Exception as e: print(f"✗ {doc_type} processing failed: {e}") else: print(f"✗ Text test file not found: {txt_test}") # Test with PDF file if available if os.path.exists(pdf_test): print(f"\nTesting PDF document processing: {os.path.basename(pdf_test)}") try: result = process_document_by_type(test_query, pdf_test, "Knowledge Document") print(f"✓ PDF processed as Knowledge Document successfully") print(f" Response preview: {result[:150]}...") except Exception as e: print(f"✗ PDF processing failed: {e}") else: print(f"✗ PDF test file not found: {pdf_test}") # Test invalid document type print(f"\nTesting invalid document type...") try: if os.path.exists(txt_test): process_document_by_type(test_query, txt_test, "Invalid Type") print("✗ Should have failed with invalid type") else: print("⚠ Skipping invalid type test - no test file available") except ValueError as e: print(f"✓ Correctly handled invalid document type: {e}") # =========================== # Test 4: Main Pipeline Interface # =========================== print(f"\n{'=' * 50}") print("TEST 4: MAIN PIPELINE INTERFACE") print(f"{'=' * 50}") # Test main pipeline with autism-related queries print("Testing main pipeline interface...") test_queries = [ "What is autism?", "How can I help a child with autism?", "Tell me about autism interventions", "What are the symptoms of ASD?" ] for query in test_queries: print(f"\nTesting query: '{query}'") try: result = main_pipeline_interface(query) print(f"✓ Pipeline response received: {result[:100]}...") except Exception as e: print(f"✗ Pipeline failed for query '{query}': {e}") # Test with non-autism query print(f"\nTesting non-autism related query...") try: result = main_pipeline_interface("What's the weather like?") print(f"✓ Non-autism query handled: {result[:100]}...") except Exception as e: print(f"✗ Non-autism query failed: {e}") # Test with empty query print(f"\nTesting empty query...") try: main_pipeline_interface("") print("✗ Should have failed with empty query") except ValueError as e: print(f"✓ Correctly handled empty query: {e}") # =========================== # Test 5: Pipeline with Document and History # =========================== print(f"\n{'=' * 50}") print("TEST 5: PIPELINE WITH DOCUMENT AND HISTORY") print(f"{'=' * 50}") # Test with real document if os.path.exists(txt_test): print("Testing pipeline with document and history...") try: initial_history = "Previous conversation history here." response, updated_history = main_pipeline_with_doc_and_history( "What information is in this document about autism?", txt_test, "Knowledge Document", initial_history ) print(f"✓ Pipeline with document successful") print(f" Response preview: {response[:100]}...") print(f" History updated: {'Yes' if len(updated_history) > len(initial_history) else 'No'}") except Exception as e: print(f"✗ Pipeline with document failed: {e}") else: print(f"✗ Cannot test with document - file not found: {txt_test}") # Test without document print("\nTesting pipeline without document...") try: response = main_pipeline_with_doc("What is autism spectrum disorder?", None, "None") print(f"✓ Pipeline without document successful: {response[:100]}...") except Exception as e: print(f"✗ Pipeline without document failed: {e}") # =========================== # Test 6: Pipeline with History Management # =========================== print(f"\n{'=' * 50}") print("TEST 6: PIPELINE WITH HISTORY MANAGEMENT") print(f"{'=' * 50}") print("Testing pipeline with history management...") try: initial_history = [["Previous user message", "Previous bot response"]] history, cleared_input = pipeline_with_history( "Tell me about autism therapy approaches", None, "None", initial_history ) if len(history) > len(initial_history): print("✓ History updated successfully") print(f" History entries: {len(history)}") print(f" Latest entry: {history[-1][0][:50]}...") else: print("✗ History not updated properly") except Exception as e: print(f"✗ Pipeline with history failed: {e}") # Test with document in history pipeline if os.path.exists(txt_test): print("\nTesting history pipeline with document...") try: history, cleared = pipeline_with_history( "Analyze this document for autism information", txt_test, "User-Specific Document", [] ) if len(history) > 0: print("✓ Document processing in history pipeline successful") print(f" Response preview: {history[-1][1][:100]}...") else: print("✗ No history entries created") except Exception as e: print(f"✗ History pipeline with document failed: {e}") # =========================== # Test 7: Unified Handler # =========================== print(f"\n{'=' * 50}") print("TEST 7: UNIFIED HANDLER") print(f"{'=' * 50}") # Test with text input print("Testing unified handler with text input...") try: history, cleared_text, cleared_audio = unified_handler( "What are the early signs of autism?", None, [] ) if len(history) >= 2: # User message + Wisal response print("✓ Text input processed successfully") print(f" User message: {history[-2][1][:50]}...") print(f" Wisal response: {history[-1][1][:50]}...") else: print("✗ Text input not processed correctly") except Exception as e: print(f"✗ Unified handler with text failed: {e}") # Test with no input print("\nTesting unified handler with no input...") try: history, cleared_text, cleared_audio = unified_handler(None, None, []) if any("Please provide either text or audio input" in str(entry) for entry in history): print("✓ No input handled correctly") else: print("✗ No input not handled properly") except Exception as e: print(f"✗ Unified handler with no input failed: {e}") # =========================== # Test 8: Wisal Handler # =========================== print(f"\n{'=' * 50}") print("TEST 8: WISAL HANDLER") print(f"{'=' * 50}") # Test with text input print("Testing Wisal handler with text input...") try: history, cleared_text, cleared_audio = wisal_handler( "Explain autism sensory sensitivities", None, [] ) if len(history) >= 2: print("✓ Wisal text processing successful") print(f" User message: {history[-2][1][:50]}...") print(f" Wisal response: {history[-1][1][:50]}...") else: print("✗ Wisal response not found in history") except Exception as e: print(f"✗ Wisal handler with text failed: {e}") # Test Wisal handler with no input print("\nTesting Wisal handler with no input...") try: history, cleared_text, cleared_audio = wisal_handler(None, None, []) if len(history) > 0: print("✓ Wisal no input handled correctly") print(f" System message: {history[-1][1]}") else: print("✗ Wisal no input not handled") except Exception as e: print(f"✗ Wisal handler with no input failed: {e}") # =========================== # Test 9: Error Handling and Edge Cases # =========================== print(f"\n{'=' * 50}") print("TEST 9: ERROR HANDLING AND EDGE CASES") print(f"{'=' * 50}") # Test with very long query print("Testing with very long query...") try: long_query = "autism " * 100 + "what are the symptoms and treatments?" result = main_pipeline_interface(long_query) print("✓ Long query handled successfully") print(f" Response preview: {result[:100]}...") except Exception as e: print(f"✓ Long query error handled: {e}") # Test with special characters in query print("\nTesting with special characters...") try: special_query = "What about autism? 🧩💙 #autism @support" result = main_pipeline_interface(special_query) print("✓ Special characters handled successfully") print(f" Response preview: {result[:100]}...") except Exception as e: print(f"✓ Special characters error handled: {e}") # Test with non-existent file print("\nTesting with non-existent file...") try: result = main_pipeline_with_doc( "Analyze this document", "non_existent_file.txt", "Knowledge Document" ) print(f"✓ Non-existent file handled: {result[:100]}...") except Exception as e: print(f"✓ Non-existent file error handled: {e}") # Test with invalid document type print("\nTesting with invalid document type...") try: if os.path.exists(txt_test): result = main_pipeline_with_doc( "Test query", txt_test, "Invalid Document Type" ) print(f"✓ Invalid document type handled: {result}") else: print("⚠ Skipping invalid document type test - no test file") except Exception as e: print(f"✓ Invalid document type error handled: {e}") # =========================== # Test 10: Configuration and Environment # =========================== print(f"\n{'=' * 50}") print("TEST 10: CONFIGURATION AND ENVIRONMENT") print(f"{'=' * 50}") print("Checking environment variables...") env_vars = [ "SILICONFLOW_API_KEY", "SILICONFLOW_URL", "SILICONFLOW_CHAT_URL", "ENVIRONMENT" ] for var in env_vars: value = os.getenv(var) if value: print(f"✓ {var}: Set (length: {len(value)})") else: print(f"✗ {var}: Not set") print(f"\nChecking global variables...") try: print(f"✓ Environment: {env}") print(f"✓ Session ID: {SESSION_ID}") print(f"✓ Valid doc types: {len(VALID_DOC_TYPES)} types") print(f"✓ Pending clarifications: {type(pending_clarifications)}") # Check VALID_DOC_TYPES mapping for key, value in VALID_DOC_TYPES.items(): print(f" - {key}: {value}") except Exception as e: print(f"✗ Global variables error: {e}") # Check test files availability print(f"\nChecking test files availability...") test_files_check = [ (pdf_test, "PDF"), (docs_test, "DOCX"), (txt_test, "TXT") ] available_files = 0 for file_path, file_type in test_files_check: if os.path.exists(file_path): size = os.path.getsize(file_path) print(f"✓ {file_type} test file available: {os.path.basename(file_path)} ({size} bytes)") available_files += 1 else: print(f"✗ {file_type} test file missing: {file_path}") print(f" Available test files: {available_files}/{len(test_files_check)}") # =========================== # Test Summary # =========================== print(f"\n{'=' * 70}") print("TEST SUMMARY") print(f"{'=' * 70}") print("✓ Utility functions tested") print("✓ File upload and saving with real files verified") print("✓ Document processing by type with real files checked") print("✓ Main pipeline interface tested") print("✓ Pipeline with document and history verified") print("✓ History management tested") print("✓ Unified handler functionality checked") print("✓ Wisal handler tested") print("✓ Error handling and edge cases validated") print("✓ Configuration and environment checked") print(f"{'=' * 70}") print("UTILS/FUNCTIONS TEST SUITE COMPLETED") print(f"{'=' * 70}") # Cleanup uploaded test files try: upload_dir = get_upload_directory() if os.path.exists(upload_dir): test_file_count = 0 for file in os.listdir(upload_dir): if file.startswith(('test_', 'temp_')): os.remove(os.path.join(upload_dir, file)) test_file_count += 1 if test_file_count > 0: print(f"✓ Cleaned up {test_file_count} test files") else: print("✓ No test files to clean up") except Exception as e: print(f"✗ Cleanup warning: {e}")