import os import logging import json import argparse from typing import List, Dict, Optional from langchain.text_splitter import RecursiveCharacterTextSplitter # MODIFIED: Import the text extraction utility to avoid code duplication from utils import extract_text_from_file, FAISS_RAG_SUPPORTED_EXTENSIONS # --- Logging Setup --- logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # Note: The 'extract_text_from_file' and 'SUPPORTED_EXTENSIONS' dictionary # have been removed from this file and are now imported from 'utils.py' # to ensure a single source of truth for file processing logic. def process_sources_and_create_chunks( sources_dir: str, output_file: str, chunk_size: int = 1000, chunk_overlap: int = 150, text_output_dir: Optional[str] = None ) -> None: """ Scans a directory for source files, extracts text, splits it into chunks, and saves the chunks to a single JSON file. Optionally saves the raw extracted text to a specified directory. """ if not os.path.isdir(sources_dir): logger.error(f"Source directory not found: '{sources_dir}'") raise FileNotFoundError(f"Source directory not found: '{sources_dir}'") logger.info(f"Starting chunking process. Sources: '{sources_dir}', Output: '{output_file}'") if text_output_dir: os.makedirs(text_output_dir, exist_ok=True) logger.info(f"Will save raw extracted text to: '{text_output_dir}'") all_chunks_for_json: List[Dict] = [] processed_files_count = 0 text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) for filename in os.listdir(sources_dir): file_path = os.path.join(sources_dir, filename) if not os.path.isfile(file_path): continue file_ext = filename.split('.')[-1].lower() if file_ext not in FAISS_RAG_SUPPORTED_EXTENSIONS: logger.debug(f"Skipping unsupported file: {filename}") continue logger.info(f"Processing source file: {filename}") # MODIFIED: Use the imported function text_content = FAISS_RAG_SUPPORTED_EXTENSIONS[file_ext](file_path) if text_content: if text_output_dir: try: text_output_path = os.path.join(text_output_dir, f"{filename}.txt") with open(text_output_path, 'w', encoding='utf-8') as f_text: f_text.write(text_content) logger.info(f"Saved extracted text for '{filename}' to '{text_output_path}'") except Exception as e_text_save: logger.error(f"Could not save extracted text for '{filename}': {e_text_save}") chunks = text_splitter.split_text(text_content) if not chunks: logger.warning(f"No chunks generated from {filename}. Skipping.") continue for i, chunk_text in enumerate(chunks): chunk_data = { "page_content": chunk_text, "metadata": { "source_document_name": filename, "chunk_index": i, "full_location": f"{filename}, Chunk {i+1}" } } all_chunks_for_json.append(chunk_data) processed_files_count += 1 else: logger.warning(f"Could not extract text from {filename}. Skipping.") if not all_chunks_for_json: logger.warning(f"No processable documents found or no text extracted in '{sources_dir}'. JSON file will be empty.") output_dir = os.path.dirname(output_file) os.makedirs(output_dir, exist_ok=True) with open(output_file, 'w', encoding='utf-8') as f: json.dump(all_chunks_for_json, f, indent=2) logger.info(f"Chunking complete. Processed {processed_files_count} files.") logger.info(f"Created a total of {len(all_chunks_for_json)} chunks.") logger.info(f"Chunked JSON output saved to: {output_file}") def main(): parser = argparse.ArgumentParser(description="Process source documents into a JSON file of text chunks for RAG.") parser.add_argument( '--sources-dir', type=str, required=True, help="The directory containing source files (PDFs, DOCX, TXT)." ) parser.add_argument( '--output-file', type=str, required=True, help="The full path for the output JSON file containing the chunks." ) parser.add_argument( '--text-output-dir', type=str, default=None, help="Optional: The directory to save raw extracted text files for debugging." ) parser.add_argument( '--chunk-size', type=int, default=1000, help="The character size for each text chunk." ) parser.add_argument( '--chunk-overlap', type=int, default=150, help="The character overlap between consecutive chunks." ) args = parser.parse_args() try: process_sources_and_create_chunks( sources_dir=args.sources_dir, output_file=args.output_file, chunk_size=args.chunk_size, chunk_overlap=args.chunk_overlap, text_output_dir=args.text_output_dir ) except Exception as e: logger.critical(f"A critical error occurred during the chunking process: {e}", exc_info=True) exit(1) if __name__ == "__main__": main()