Spaces:
Sleeping
Sleeping
| import os | |
| import gradio as gr | |
| import pandas as pd | |
| import numpy as np | |
| from pathlib import Path | |
| import time | |
| import hashlib | |
| from datetime import datetime | |
| import torch | |
| from PIL import Image | |
| import faiss | |
| # 必要なライブラリをインポート | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain.embeddings import HuggingFaceEmbeddings | |
| from langchain.vectorstores import Chroma | |
| from langchain.chains import RetrievalQA | |
| from langchain.prompts import PromptTemplate | |
| from langchain.llms import HuggingFacePipeline | |
| from langchain.schema import Document | |
| from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline | |
| from transformers import T5ForConditionalGeneration, T5Tokenizer | |
| from sentence_transformers import SentenceTransformer | |
| # 条件付きインポート(ローカル環境とHugging Face Spacesの両方に対応) | |
| try: | |
| import fitz # PyMuPDF | |
| PYMUPDF_AVAILABLE = True | |
| except ImportError: | |
| PYMUPDF_AVAILABLE = False | |
| print("PyMuPDFが利用できません。PDFファイルはテキスト抽出のみで処理されます。") | |
| try: | |
| import easyocr | |
| EASYOCR_AVAILABLE = True | |
| except ImportError: | |
| EASYOCR_AVAILABLE = False | |
| print("EasyOCRが利用できません。OCR機能は無効化されます。") | |
| try: | |
| import cv2 | |
| CV2_AVAILABLE = True | |
| except ImportError: | |
| CV2_AVAILABLE = False | |
| print("OpenCVが利用できません。画像処理機能は制限されます。") | |
| class ManualChatbot: | |
| def __init__(self, docs_dir="./manuals"): | |
| """手順書チャットボットの初期化""" | |
| self.docs_dir = docs_dir | |
| self.vectorstore = None # ベクトルデータベースの初期化 | |
| self.file_hashes = {} # ファイルのハッシュ値を保持する辞書 | |
| self.last_update_check = None # 最後に更新をチェックした時間 | |
| self.processing_status = "未初期化" | |
| # 使用するモデルの次元数を一貫させる | |
| self.embedding_model_name = "intfloat/multilingual-e5-base" | |
| self.embedding_dimension = 1024 # このモデルの次元数 | |
| # ディレクトリが存在しなければ作成 | |
| os.makedirs(docs_dir, exist_ok=True) | |
| os.makedirs("./chroma_db", exist_ok=True) | |
| # ファイルハッシュの記録ファイルパス | |
| self.hash_file_path = os.path.join(os.path.dirname(docs_dir), "file_hashes.json") | |
| # OCRの初期化(可能な場合) | |
| if EASYOCR_AVAILABLE: | |
| self.reader = easyocr.Reader(['ja', 'en']) # 日本語と英語に対応 | |
| print("EasyOCRを初期化しました") | |
| else: | |
| self.reader = None | |
| # 要約用の T5 モデル準備(モデルサイズを小さくしてHF Spacesでの動作に最適化) | |
| self.summarizer_model = None | |
| self.summarizer_tokenizer = None | |
| # ハッシュ読み込み | |
| self._load_file_hashes() | |
| def _load_file_hashes(self): | |
| """保存されたファイルハッシュを読み込む""" | |
| if os.path.exists(self.hash_file_path): | |
| try: | |
| import json | |
| with open(self.hash_file_path, 'r') as f: | |
| self.file_hashes = json.load(f) | |
| print(f"{len(self.file_hashes)}件のファイルハッシュを読み込みました") | |
| except Exception as e: | |
| print(f"ファイルハッシュの読み込みに失敗しました: {str(e)}") | |
| self.file_hashes = {} | |
| else: | |
| self.file_hashes = {} | |
| def _save_file_hashes(self): | |
| """ファイルハッシュを保存する""" | |
| try: | |
| import json | |
| with open(self.hash_file_path, 'w') as f: | |
| json.dump(self.file_hashes, f) | |
| print(f"{len(self.file_hashes)}件のファイルハッシュを保存しました") | |
| except Exception as e: | |
| print(f"ファイルハッシュの保存に失敗しました: {str(e)}") | |
| def _get_file_hash(self, file_path): | |
| """ファイルのMD5ハッシュを計算する""" | |
| hash_md5 = hashlib.md5() | |
| with open(file_path, "rb") as f: | |
| for chunk in iter(lambda: f.read(4096), b""): | |
| hash_md5.update(chunk) | |
| return hash_md5.hexdigest() | |
| def process_uploaded_files(self, files): | |
| """ | |
| Gradioからアップロードされたファイルを処理する | |
| :param files: アップロードされたファイルのリスト | |
| :return: 処理状況を示すメッセージ | |
| """ | |
| if not files: | |
| return "ファイルがアップロードされていません" | |
| self.processing_status = "処理中..." | |
| # 新しく追加されたファイルを一時的に保存し処理する | |
| file_paths = [] | |
| for file in files: | |
| if file is None: | |
| continue | |
| # ファイル拡張子を確認 | |
| filename = getattr(file, "orig_name", os.path.basename(file.name)) #file.name | |
| file_ext = os.path.splitext(filename)[1].lower() | |
| if file_ext not in ['.pdf', '.xlsx', '.xls', '.png', '.jpg', '.jpeg']: | |
| continue | |
| # ファイルを保存する | |
| save_path = os.path.join(self.docs_dir, os.path.basename(filename)) | |
| with open(save_path, 'wb') as f: | |
| f.write(file.read()) | |
| file_paths.append(save_path) | |
| if not file_paths: | |
| self.processing_status = "サポートされているファイルがありませんでした" | |
| return "サポートされているファイルがありませんでした(.pdf, .xlsx, .xls, .png, .jpg, .jpeg)" | |
| # ファイルを処理して知識ベースを更新 | |
| self.update_knowledge_base(file_paths) | |
| self.processing_status = "準備完了" | |
| return f"{len(file_paths)}個のファイルが処理され、知識ベースに追加されました" | |
| def update_knowledge_base(self, file_paths): | |
| """ | |
| 指定したファイルから新しいデータを読み込み、インデックスを更新する | |
| :param file_paths: 更新したファイルのパス一覧(リスト) | |
| """ | |
| print(f"{len(file_paths)}件のファイルを処理します...") | |
| new_documents = [] | |
| for file_path in file_paths: | |
| if file_path.lower().endswith(".pdf"): | |
| new_documents.extend(self._process_pdf(file_path)) | |
| elif file_path.lower().endswith((".xlsx", ".xls")): | |
| new_documents.extend(self._process_excel(file_path)) | |
| elif file_path.lower().endswith((".png", ".jpg", ".jpeg")): | |
| new_documents.extend(self._process_image(file_path)) | |
| if not new_documents: | |
| print("処理対象のドキュメントがありませんでした") | |
| return | |
| print(f"{len(new_documents)}件のドキュメントを処理しました") | |
| # テキスト分割 | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=1000, | |
| chunk_overlap=200, | |
| separators=["\n\n", "\n", "。", "、", " ", ""] | |
| ) | |
| chunks = text_splitter.split_documents(new_documents) | |
| print(f"{len(chunks)}個のテキストチャンクに分割しました") | |
| # 埋め込みモデルの初期化 | |
| embeddings = HuggingFaceEmbeddings( | |
| model_name=self.embedding_model_name, | |
| model_kwargs={'device': 'cpu'} | |
| ) | |
| # ベクトルストアの初期化/更新 | |
| if self.vectorstore is None: | |
| # 初めての場合は新規作成 | |
| try: | |
| self.vectorstore = Chroma.from_documents( | |
| documents=chunks, | |
| embedding=embeddings, | |
| persist_directory="./chroma_db" | |
| ) | |
| self.vectorstore.persist() | |
| except Exception as e: | |
| print(f"ベクトルストア作成中にエラー発生: {str(e)}") | |
| # 既存のchroma_dbディレクトリを削除して再作成する | |
| import shutil | |
| if os.path.exists("./chroma_db"): | |
| shutil.rmtree("./chroma_db") | |
| os.makedirs("./chroma_db", exist_ok=True) | |
| # 再度作成を試みる | |
| self.vectorstore = Chroma.from_documents( | |
| documents=chunks, | |
| embedding=embeddings, | |
| persist_directory="./chroma_db" | |
| ) | |
| self.vectorstore.persist() #else: | |
| # 既存のベクトルストアに新しいドキュメントを追加 | |
| self.vectorstore.add_documents(chunks) | |
| # ベクトルストアを保存 | |
| self.vectorstore.persist() | |
| # もしQAチェーンがなければ初期化 | |
| if not hasattr(self, 'qa_chain') or self.qa_chain is None: | |
| self._initialize_qa_chain() | |
| else: | |
| # QAチェーンを更新された検索エンジンで更新 | |
| self.qa_chain.retriever = self.vectorstore.as_retriever(search_kwargs={"k": 3}) | |
| print("知識ベースを更新しました!") | |
| def _process_pdf(self, file_path): | |
| """PDFファイルを処理してドキュメントを返す""" | |
| try: | |
| # PyMuPDFが利用可能な場合 | |
| if PYMUPDF_AVAILABLE: | |
| doc = fitz.open(file_path) | |
| all_text = "" | |
| for page_num, page in enumerate(doc): | |
| text = page.get_text() | |
| all_text += f"--- Page {page_num + 1} ---\n{text}\n\n" | |
| # OCRが必要か確認(テキストが少ない場合) | |
| if len(all_text.strip()) < 100 and EASYOCR_AVAILABLE and self.reader: | |
| all_text = self.extract_text_from_pdf_with_ocr(file_path) | |
| return [Document(page_content=all_text, metadata={"source": file_path})] | |
| else: | |
| # 簡易処理(PyMuPDFが利用できない場合) | |
| # 注意: この場合はPDFの内容を適切に抽出できない可能性がある | |
| return [Document(page_content=f"PDF file: {os.path.basename(file_path)}", | |
| metadata={"source": file_path})] | |
| except Exception as e: | |
| print(f"PDFファイルの処理中にエラーが発生しました ({file_path}): {str(e)}") | |
| return [] | |
| def _process_excel(self, file_path): | |
| """Excelファイルを処理してドキュメントを返す""" | |
| try: | |
| # Pandas でExcelを読み込む | |
| dfs = pd.read_excel(file_path, sheet_name=None, engine="openpyxl" if file_path.endswith(".xlsx") else "xlrd") | |
| documents = [] | |
| for sheet_name, df in dfs.items(): | |
| # NaN値を空文字列に変換 | |
| df = df.fillna('') | |
| # 各行をテキストに変換 | |
| for idx, row in df.iterrows(): | |
| content = f"Sheet: {sheet_name}, Row: {idx}\n" | |
| for col in df.columns: | |
| content += f"{col}: {row[col]}\n" | |
| doc = Document( | |
| page_content=content, | |
| metadata={"source": file_path, "sheet": sheet_name, "row": idx} | |
| ) | |
| documents.append(doc) | |
| return documents | |
| except Exception as e: | |
| print(f"Excelファイルの処理中にエラーが発生しました ({file_path}): {str(e)}") | |
| return [] | |
| def _process_image(self, file_path): | |
| """画像ファイルを処理してドキュメントを返す""" | |
| try: | |
| if not EASYOCR_AVAILABLE or not self.reader: | |
| return [Document( | |
| page_content=f"画像ファイル: {os.path.basename(file_path)} (OCR未対応)", | |
| metadata={"source": file_path} | |
| )] | |
| img = Image.open(file_path) | |
| # EasyOCRで画像からテキストを抽出 | |
| result = self.reader.readtext(np.array(img)) | |
| # 抽出されたテキストを結合 | |
| text = "\n".join([detection[1] for detection in result]) | |
| if not text.strip(): | |
| text = f"画像ファイル: {os.path.basename(file_path)} (テキスト検出なし)" | |
| # ドキュメントとしてリストに追加 | |
| return [Document(page_content=text, metadata={"source": file_path})] | |
| except Exception as e: | |
| print(f"画像ファイルの処理中にエラーが発生しました ({file_path}): {str(e)}") | |
| return [] | |
| def _initialize_qa_chain(self): | |
| """QAチェーンを初期化する""" | |
| try: | |
| # LLMの初期化(小さいモデルを使用) | |
| model_name = "cyberagent/open-calm-small" # 日本語対応の小さいモデル | |
| tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| model = AutoModelForCausalLM.from_pretrained(model_name) | |
| pipe = pipeline( | |
| "text-generation", | |
| model=model, | |
| tokenizer=tokenizer, | |
| max_new_tokens=300, | |
| temperature=0.7, | |
| do_sample=True, | |
| device="cpu" # Spaces環境ではCPU使用 | |
| ) | |
| local_llm = HuggingFacePipeline(pipeline=pipe) | |
| # プロンプトテンプレート | |
| template = """ | |
| 次の手順書データを使って質問に答えてください。 | |
| ### 手順書データ: | |
| {context} | |
| ### 質問: | |
| {question} | |
| ### 回答: | |
| """ | |
| prompt = PromptTemplate( | |
| template=template, | |
| input_variables=["context", "question"] | |
| ) | |
| # QAチェーンの作成 | |
| self.qa_chain = RetrievalQA.from_chain_type( | |
| llm=local_llm, | |
| chain_type="stuff", | |
| retriever=self.vectorstore.as_retriever(search_kwargs={"k": 3}), | |
| chain_type_kwargs={"prompt": prompt}, | |
| return_source_documents=True | |
| ) | |
| print("QAチェーンを初期化しました") | |
| except Exception as e: | |
| print(f"QAチェーンの初期化中にエラーが発生しました: {str(e)}") | |
| self.qa_chain = None | |
| def extract_text_from_pdf_with_ocr(self, pdf_path): | |
| """PDFファイルからテキストを抽出し、必要に応じてOCRを適用する""" | |
| if not PYMUPDF_AVAILABLE or not EASYOCR_AVAILABLE or not self.reader: | |
| return f"PDF: {os.path.basename(pdf_path)} (OCR未対応)" | |
| doc = fitz.open(pdf_path) | |
| full_text = "" | |
| for page_num, page in enumerate(doc): | |
| # テキストの抽出を試みる | |
| text = page.get_text() | |
| # テキストが少ない場合はOCRを適用する | |
| if len(text.strip()) < 50: # 少ないテキストの閾値 | |
| # ページを画像として抽出 | |
| pix = page.get_pixmap() | |
| img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) | |
| img_np = np.array(img) | |
| # EasyOCRを使用してテキスト抽出 | |
| result = self.reader.readtext(img_np) | |
| ocr_text = "\n".join([detection[1] for detection in result]) | |
| # OCRテキストを使用 | |
| text = ocr_text if ocr_text.strip() else text | |
| full_text += f"--- Page {page_num + 1} ---\n{text}\n\n" | |
| return full_text | |
| def ask(self, question): | |
| """質問をボットに問いかけ、回答と参照ソースを取得する""" | |
| if not hasattr(self, 'qa_chain') or self.qa_chain is None: | |
| return "チャットボットがまだ初期化されていません。ファイルをアップロードしてください。", "" | |
| try: | |
| result = self.qa_chain.invoke({"query": question}) #({"question": question}) | |
| # 回答の取得 | |
| if "result" in result: | |
| answer = result["result"] | |
| else: | |
| return "回答を生成できませんでした。", "" | |
| # 参照ソースの取得 | |
| source_documents = result.get("source_documents", []) | |
| sources_text = "" | |
| if source_documents: | |
| sources_text = "参照ソース:\n" | |
| for i, doc in enumerate(source_documents, 1): | |
| source = doc.metadata.get("source", "不明") | |
| filename = os.path.basename(source) | |
| sources_text += f"{i}. {filename}\n" | |
| return answer, sources_text | |
| except Exception as e: | |
| return f"エラーが発生しました: {str(e)}", "" | |
| def load(self): | |
| """保存済みのベクトルストアを読み込む""" | |
| if os.path.exists("./chroma_db"): | |
| try: | |
| # 埋め込みモデルの初期化 - 一貫したモデルを使用 | |
| embeddings = HuggingFaceEmbeddings( | |
| model_name=self.embedding_model_name, | |
| model_kwargs={'device': 'cpu'} | |
| ) | |
| self.vectorstore = Chroma( | |
| persist_directory="./chroma_db", | |
| embedding_function=embeddings | |
| ) | |
| # QAチェーンを初期化 | |
| self._initialize_qa_chain() | |
| self.processing_status = "準備完了" | |
| return "保存済みの知識ベースを読み込みました" | |
| except Exception as e: | |
| import traceback | |
| error_details = traceback.format_exc() | |
| print(f"知識ベース読み込みエラー: {str(e)}\n{error_details}") | |
| # エラーの場合、chroma_dbディレクトリを削除して新規作成する選択肢も | |
| self.processing_status = "エラー" | |
| return f"知識ベースの読み込みに失敗しました: {str(e)}" | |
| else: | |
| self.processing_status = "初期化待ち" | |
| return "知識ベースが見つかりません。ファイルをアップロードしてください。" | |
| # Gradioインターフェースの作成 | |
| def create_interface(): | |
| # チャットボットのインスタンスを作成 | |
| bot = ManualChatbot(docs_dir="./manuals") | |
| # 保存済みデータがあれば読み込む | |
| load_status = bot.load() | |
| # Gradioインターフェース | |
| with gr.Blocks(title="手順書チャットボット") as demo: | |
| gr.Markdown("# 手順書チャットボット") | |
| gr.Markdown("PDFやExcel、画像ファイルをアップロードして、それらの内容に関する質問に答えます。") | |
| with gr.Tab("ファイルアップロード"): | |
| upload_files = gr.File(file_count="multiple", label="PDFやExcel、画像ファイルをアップロード") | |
| upload_button = gr.Button("処理開始") | |
| status_output = gr.Textbox(label="ステータス", value=load_status) | |
| upload_button.click( | |
| fn=bot.process_uploaded_files, | |
| inputs=[upload_files], | |
| outputs=[status_output] | |
| ) | |
| with gr.Tab("チャット"): | |
| chatbot = gr.Chatbot(label="会話") | |
| msg = gr.Textbox(label="質問を入力してください") | |
| clear = gr.Button("クリア") | |
| def respond(message, chat_history): | |
| if not message.strip(): | |
| return chat_history | |
| # ボットに質問する | |
| bot_response, sources = bot.ask(message) | |
| # 回答とソース情報を組み合わせる | |
| full_response = bot_response | |
| if sources: | |
| full_response += f"\n\n{sources}" | |
| # チャット履歴を更新する | |
| chat_history.append((message, full_response)) | |
| return "", chat_history | |
| msg.submit(respond, [msg, chatbot], [msg, chatbot]) | |
| clear.click(lambda: None, None, chatbot, queue=False) | |
| with gr.Tab("使い方"): | |
| gr.Markdown(""" | |
| ## 使い方 | |
| 1. **ファイルアップロード**タブで、PDFファイル、Excelファイル、または画像ファイルをアップロードします。 | |
| 2. **処理開始**ボタンをクリックして、ファイルを処理します。 | |
| 3. 処理が完了したら**チャット**タブに移動します。 | |
| 4. 質問を入力して、手順書の内容に基づいた回答を得ることができます。 | |
| ## サポートしているファイル形式 | |
| - PDF (.pdf) | |
| - Excel (.xlsx, .xls) | |
| - 画像ファイル (.png, .jpg, .jpeg) | |
| ## 注意事項 | |
| - 大きなファイルの処理には時間がかかる場合があります。 | |
| - 画像からのテキスト抽出(OCR)は言語によって精度が異なります。 | |
| - 回答は参照元のドキュメントに基づいて生成されるため、データが不十分な場合は正確な回答ができない場合があります。 | |
| """) | |
| return demo | |
| # Hugging Face Spacesで実行する場合のエントリーポイント | |
| if __name__ == "__main__": | |
| # Gradioインターフェースを作成して起動 | |
| demo = create_interface() | |
| demo.launch() |