#!/usr/bin/env python3 # -*- coding: utf-8 -*- import streamlit as st import pandas as pd import sys import os import json import re import logging import requests import markdown import time import io import random import hashlib from datetime import datetime from typing import Iterator, List, Dict, Any, Generator, Tuple from itertools import combinations import urllib.parse from dataclasses import dataclass import PyPDF2 from collections import Counter from docx import Document from docx.shared import Pt, Mm from docx.enum.text import WD_ALIGN_PARAGRAPH import numpy as np import matplotlib.pyplot as plt import matplotlib.patches as patches from matplotlib.patches import FancyBboxPatch, Rectangle, Circle, Arrow, Polygon import matplotlib.font_manager as fm import graphviz from tempfile import NamedTemporaryFile import base64 # Gradio Client for image generation try: from gradio_client import Client except ImportError: Client = None logging.warning("gradio_client not installed. Image generation will be disabled.") # Friendli AI imports try: from openai import OpenAI, APIError, APITimeoutError except ImportError: logging.warning("openai package not installed.") OpenAI = None APIError = Exception APITimeoutError = Exception import tempfile import glob import shutil # Additional libraries try: import pyarrow.parquet as pq except ImportError: logging.warning("pyarrow not installed. Parquet file reading will be disabled.") pq = None try: from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity SKLEARN_AVAILABLE = True except ImportError: logging.warning("scikit-learn not installed. Some features will be disabled.") TfidfVectorizer = None cosine_similarity = None SKLEARN_AVAILABLE = False # Network stability libraries try: import httpx from httpx import RemoteProtocolError except ImportError: logging.warning("httpx not installed.") httpx = None RemoteProtocolError = Exception # Backoff fallback try: import backoff except ImportError: logging.warning("`backoff` module is missing. Using a simple fallback decorator.") def _simple_backoff_on_exception(exceptions, *args, **kwargs): max_tries = kwargs.get("max_tries", 3) base = kwargs.get("base", 2) def decorator(fn): def wrapper(*f_args, **f_kwargs): attempt = 0 while True: try: return fn(*f_args, **f_kwargs) except exceptions as e: attempt += 1 if attempt >= max_tries: raise sleep = base ** attempt logging.info(f"[retry {attempt}/{max_tries}] {fn.__name__} -> {e} … waiting {sleep}s") time.sleep(sleep) return wrapper return decorator class _DummyBackoff: on_exception = _simple_backoff_on_exception backoff = _DummyBackoff() # Streamlit page configuration st.set_page_config( page_title="Ilúvatar: Patent-Focused Creative Design & Invention AI", layout="wide", initial_sidebar_state="expanded" ) # Environment Variables FIREWORKS_API_KEY = os.getenv("FIREWORKS_API_KEY", "") FIREWORKS_API_URL = os.getenv("FIREWORKS_API_URL", "") FIREWORKS_MODEL = os.getenv("FIREWORKS_MODEL", "") BRAVE_KEY = os.getenv("BAPI_TOKEN", "") SERPHOUSE_KEY = os.getenv("SERPHOUSE_API_KEY", "") IMAGE_API_URL = os.getenv("IMAGE_API_URL") MAX_TOKENS = 7999 BRAVE_ENDPOINT = "https://api.search.brave.com/res/v1/web/search" # Logging setup LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO") logging.basicConfig( level=getattr(logging, LOG_LEVEL.upper(), logging.INFO), format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) # SOMA System Integration class SOMASystem: """Self-Orchestrating Modular Architect System""" def __init__(self, api_key: str): self.api_key = api_key self.api_url = FIREWORKS_API_URL self.model = FIREWORKS_MODEL self.test_mode = not api_key def create_headers(self): return { "Accept": "application/json", "Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}" } def call_llm_streaming(self, messages: List[Dict[str, str]], role: str, max_tokens: int = 4096, language: str = "English") -> Generator[str, None, None]: """Streaming LLM API call using Fireworks AI""" if self.test_mode: test_response = f"[{role.upper()}] This is a test response for {role} AI." yield from self.simulate_streaming(test_response, role) return try: system_prompts = self.get_system_prompts(language) full_messages = [ {"role": "system", "content": system_prompts.get(role, "")}, *messages ] payload = { "model": self.model, "messages": full_messages, "max_tokens": max_tokens, "top_p": 1, "top_k": 40, "presence_penalty": 0, "frequency_penalty": 0, "temperature": 0.6, "stream": True } logging.info(f"Making API request to {self.api_url} for role {role}") response = requests.post( self.api_url, headers=self.create_headers(), data=json.dumps(payload), stream=True, timeout=60 ) if response.status_code != 200: error_msg = f"❌ API error ({response.status_code}): {response.text[:200]}" logging.error(error_msg) yield error_msg return buffer = "" chunk_count = 0 for line in response.iter_lines(): if line: line = line.decode('utf-8') if line.startswith("data: "): data = line[6:] if data == "[DONE]": if buffer: yield buffer break try: chunk = json.loads(data) if "choices" in chunk and chunk["choices"]: content = chunk["choices"][0].get("delta", {}).get("content", "") if content: buffer += content chunk_count += 1 if len(buffer) > 20 or '\n' in buffer or chunk_count % 5 == 0: yield buffer buffer = "" except json.JSONDecodeError as e: logging.warning(f"JSON decode error: {e} for line: {line[:100]}") continue if buffer: yield buffer except requests.exceptions.Timeout: error_msg = "❌ Request timeout - API response taking too long" logging.error(error_msg) yield error_msg except Exception as e: error_msg = f"❌ Error occurred: {str(e)}" logging.error(f"Error during streaming: {str(e)}", exc_info=True) yield error_msg def simulate_streaming(self, text: str, role: str) -> Generator[str, None, None]: """Simulate streaming in test mode""" words = text.split() chunk_size = 5 for i in range(0, len(words), chunk_size): chunk = " ".join(words[i:i+chunk_size]) yield chunk + " " time.sleep(0.02) def get_system_prompts(self, language: str = "English") -> Dict[str, str]: """Get system prompts for each AI role""" return { "supervisor": "You are a senior researcher directing inventions from a patent strategy perspective. You provide strategic guidance for prior art research and patentability enhancement.", "critic": "You are a patent expert evaluating inventions from a patent examiner's perspective. You strictly assess novelty, inventive step, and industrial applicability, and review patent specification draftability.", "creator": "You are an inventor creating patentable innovative inventions. You present creative and feasible solutions that overcome limitations of existing technology.", "researcher": "You are a professional researcher investigating patent information and prior art. You systematically analyze related patents, technology trends, and market needs.", "analyst": "You are a senior analyst building patent portfolios. You comprehensively evaluate the patentability, marketability, and technical value of inventions." } # ============================================================================ # ENHANCED SOMA COLLABORATION SYSTEM # ============================================================================ @dataclass class AgentMessage: """Agent communication message structure""" from_agent: str to_agent: str message_type: str content: str context: Dict[str, Any] timestamp: float @dataclass class CollaborationResult: """Collaboration result""" final_output: str collaboration_rounds: int agent_contributions: Dict[str, List[str]] fact_checks: List[Dict] consensus_score: float class EnhancedSOMASystem(SOMASystem): """Enhanced SOMA System - Autonomous collaboration and fact-checking""" def __init__(self, api_key: str): super().__init__(api_key) self.message_queue: List[AgentMessage] = [] self.collaboration_history: List[Dict] = [] self.fact_check_cache: Dict[str, bool] = {} def autonomous_collaboration( self, task: str, initial_data: Dict, max_rounds: int = 3, min_consensus: float = 0.8, language: str = "English" ) -> CollaborationResult: """Autonomous multi-agent collaboration""" collaboration_rounds = 0 agent_contributions = { "researcher": [], "supervisor": [], "critic": [], "creator": [], "analyst": [] } fact_checks = [] current_output = initial_data consensus_score = 0.0 for round_num in range(max_rounds): collaboration_rounds += 1 logging.info(f"Collaboration Round {round_num + 1}/{max_rounds}") round_results = {} # Researcher researcher_output = self._agent_work_with_questions( "researcher", task, current_output, language ) round_results["researcher"] = researcher_output agent_contributions["researcher"].append(researcher_output["output"]) # Creator creator_output = self._agent_work_with_questions( "creator", task, {**current_output, "research": researcher_output["output"]}, language ) round_results["creator"] = creator_output agent_contributions["creator"].append(creator_output["output"]) # Critic critic_output = self._critical_review_with_factcheck( task, {**current_output, "research": researcher_output["output"], "ideas": creator_output["output"]}, language ) round_results["critic"] = critic_output agent_contributions["critic"].append(critic_output["output"]) fact_checks.extend(critic_output.get("fact_checks", [])) # Supervisor supervisor_output = self._supervisor_synthesis( task, round_results, language ) round_results["supervisor"] = supervisor_output agent_contributions["supervisor"].append(supervisor_output["output"]) # Analyst analyst_output = self._analyst_evaluation( task, round_results, language ) round_results["analyst"] = analyst_output agent_contributions["analyst"].append(analyst_output["output"]) # Calculate consensus score consensus_score = self._calculate_consensus(round_results) logging.info(f"Round {round_num + 1} Consensus: {consensus_score:.2f}") if consensus_score >= min_consensus: logging.info(f"Consensus reached at round {round_num + 1}") break current_output = self._merge_round_results(round_results) final_output = self._generate_final_output( current_output, agent_contributions, language ) return CollaborationResult( final_output=final_output, collaboration_rounds=collaboration_rounds, agent_contributions=agent_contributions, fact_checks=fact_checks, consensus_score=consensus_score ) def _agent_work_with_questions( self, agent_role: str, task: str, context: Dict, language: str ) -> Dict: """Agent performs work including asking questions""" prompt = self._create_collaborative_prompt( agent_role, task, context, language ) output = "" questions_for_others = [] for chunk in self.call_llm_streaming( [{"role": "user", "content": prompt}], agent_role, max_tokens=4096, language=language ): output += chunk questions_for_others = self._extract_questions(output) answers = {} if questions_for_others: answers = self._get_answers_from_agents( agent_role, questions_for_others, context, language ) return { "output": output, "questions": questions_for_others, "answers_received": answers } def _critical_review_with_factcheck( self, task: str, context: Dict, language: str ) -> Dict: """Critical review with fact-checking""" prompt = f"""As a critical expert, rigorously review the following content and perform fact-checks: Task: {task} Provided Information: {json.dumps(context, ensure_ascii=False, indent=2)[:3000]} Please verify the following: 1. **Fact Check**: - Is there evidence for all claims? - Are statistics or numbers accurate? - Are cited sources credible? 2. **Logical Consistency**: - Are claims logically connected to evidence? - Are there any contradictions? 3. **Completeness**: - Is any important information missing? - Have alternative views been considered? 4. **Patentability Perspective**: - Is there sufficient evidence for novelty? - Are inventive step claims valid? For each item: - ✅ Verified: [evidence] - ⚠️ Questionable: [reason] - ❌ Error: [issue] Provide your assessment in this format.""" output = "" for chunk in self.call_llm_streaming( [{"role": "user", "content": prompt}], "critic", max_tokens=5000, language=language ): output += chunk fact_checks = self._parse_fact_checks(output) verified_facts = self._verify_with_web_search(fact_checks, context) return { "output": output, "fact_checks": fact_checks, "verified_facts": verified_facts } def _supervisor_synthesis( self, task: str, round_results: Dict, language: str ) -> Dict: """Supervisor synthesizes all agent results""" prompt = f"""As a supervisor, synthesize the following agent work results: Task: {task} Agent Results: {json.dumps({k: v.get('output', '')[:500] for k, v in round_results.items()}, ensure_ascii=False, indent=2)} Please perform the following: 1. **Integrate Strengths**: Identify and integrate excellent contributions from each agent 2. **Address Weaknesses**: Supplement deficiencies with other agents' results 3. **Resolve Conflicts**: Reconcile conflicting opinions between agents 4. **Set Priorities**: Select the most important ideas/improvements 5. **Suggest Next Steps**: Propose additional work needed Organize the synthesis results systematically.""" output = "" for chunk in self.call_llm_streaming( [{"role": "user", "content": prompt}], "supervisor", max_tokens=5000, language=language ): output += chunk return {"output": output} def _analyst_evaluation( self, task: str, round_results: Dict, language: str ) -> Dict: """Analyst evaluates collaboration quality""" prompt = f"""As an analysis expert, evaluate the quality of this collaboration round: Task: {task} Agent Results: {json.dumps({k: v.get('output', '')[:500] for k, v in round_results.items()}, ensure_ascii=False, indent=2)} Evaluate based on the following criteria (each out of 10 points): 1. **Completeness**: Degree of task goal achievement 2. **Originality**: New and innovative ideas 3. **Feasibility**: Actual implementation possibility 4. **Patentability**: Patent filing possibility 5. **Collaboration Synergy**: Effectiveness of inter-agent cooperation Provide the score and detailed rationale for each item, **Overall Score (out of 50)** and **Improvement directions for next round**.""" output = "" for chunk in self.call_llm_streaming( [{"role": "user", "content": prompt}], "analyst", max_tokens=4096, language=language ): output += chunk scores = self._extract_scores(output) return { "output": output, "scores": scores } def _create_collaborative_prompt( self, agent_role: str, task: str, context: Dict, language: str ) -> str: """Generate prompt for collaboration""" # Convert Category objects to JSON serializable format serializable_context = {} for key, value in context.items(): if key == 'categories': # Convert Category list to dictionary list serializable_context[key] = [ { 'name_en': cat.name_en, 'tags': cat.tags, 'items': cat.items[:5] # Only first 5 items } for cat in value[:10] # Only first 10 categories ] elif key == 'combinations': # Convert combinations to simple string list serializable_context[key] = [ f"{' + '.join([f'{c[0]}-{c[1]}' for c in combo[4]])}" for combo in value[:10] ] elif isinstance(value, (str, int, float, bool, type(None))): serializable_context[key] = value elif isinstance(value, dict): serializable_context[key] = str(value)[:500] else: serializable_context[key] = str(value)[:500] context_str = json.dumps(serializable_context, ensure_ascii=False, indent=2)[:2000] base_prompts = { "researcher": f"""As a research expert, perform the following task: Task: {task} Provided Context: {context_str} Actions: 1. Research related prior art and data 2. Summarize key findings 3. **Questions for other agents**: Specify information needed or items requiring verification in [QUESTION FOR {{role}}] format Example: [QUESTION FOR CREATOR] How can this technology be creatively utilized?""", "creator": f"""As a creative expert, perform the following task: Task: {task} Provided Context: {context_str} Actions: 1. Generate innovative ideas 2. Highlight differences from existing technology 3. **Questions for other agents**: Specify areas needing verification or additional information in [QUESTION FOR {{role}}] format Example: [QUESTION FOR CRITIC] Are there any patentability issues with this idea?""", "supervisor": f"""As a supervisor, perform the following task: Task: {task} Provided Context: {context_str} Actions: 1. Coordinate overall process 2. Coordinate between agents 3. Set priorities 4. **Instructions to other agents**: Specify additional work or improvements needed in [REQUEST TO {{role}}] format""", "analyst": f"""As an analysis expert, perform the following task: Task: {task} Provided Context: {context_str} Actions: 1. Analyze current results 2. Evaluate strengths and weaknesses 3. Suggest improvement directions 4. **Questions for other agents**: Additional information needed for evaluation in [QUESTION FOR {{role}}] format""" } return base_prompts.get(agent_role, "") def _extract_questions(self, output: str) -> List[Tuple[str, str]]: """Extract questions from output""" questions = [] pattern = r'\[QUESTION FOR ([A-Z]+)\](.*?)(?=\[|$)' matches = re.finditer(pattern, output, re.DOTALL | re.IGNORECASE) for match in matches: role = match.group(1).lower() question = match.group(2).strip() questions.append((role, question)) return questions def _get_answers_from_agents( self, asking_agent: str, questions: List[Tuple[str, str]], context: Dict, language: str ) -> Dict[str, str]: """Get answers from other agents""" answers = {} # Make context JSON serializable serializable_context = {} for key, value in context.items(): if key == 'categories': # Convert Category list to dictionary list if isinstance(value, list) and len(value) > 0: serializable_context[key] = [ { 'name_en': cat.name_en if hasattr(cat, 'name_en') else str(cat), 'tags': cat.tags if hasattr(cat, 'tags') else [], 'items': cat.items[:5] if hasattr(cat, 'items') else [] } for cat in value[:10] ] else: serializable_context[key] = [] elif key == 'combinations': # Convert combinations to simple string list if isinstance(value, list): serializable_context[key] = [ f"{' + '.join([f'{c[0]}-{c[1]}' for c in combo[4]])}" for combo in value[:10] if len(combo) > 4 ] else: serializable_context[key] = [] elif isinstance(value, (str, int, float, bool, type(None))): serializable_context[key] = value elif isinstance(value, dict): serializable_context[key] = str(value)[:500] else: serializable_context[key] = str(value)[:500] for target_role, question in questions: if target_role in ["researcher", "creator", "critic", "supervisor", "analyst"]: prompt = f"""{asking_agent.upper()} agent's question - please answer: Question: {question} Context: {json.dumps(serializable_context, ensure_ascii=False, indent=2)[:1500]} Provide a clear and concise answer.""" answer = "" for chunk in self.call_llm_streaming( [{"role": "user", "content": prompt}], target_role, max_tokens=2048, language=language ): answer += chunk answers[target_role] = answer return answers def _parse_fact_checks(self, critic_output: str) -> List[Dict]: """Parse fact-check results from critic output""" fact_checks = [] patterns = { 'verified': r'✅\s*Verified:(.+?)(?=\n[✅⚠️❌]|\Z)', 'suspicious': r'⚠️\s*Questionable:(.+?)(?=\n[✅⚠️❌]|\Z)', 'error': r'❌\s*Error:(.+?)(?=\n[✅⚠️❌]|\Z)' } for status, pattern in patterns.items(): matches = re.finditer(pattern, critic_output, re.DOTALL) for match in matches: content = match.group(1).strip() fact_checks.append({ 'status': status, 'content': content, 'timestamp': time.time() }) return fact_checks def _verify_with_web_search( self, fact_checks: List[Dict], context: Dict ) -> List[Dict]: """Verify facts with web search""" verified = [] for fact in fact_checks: if fact['status'] in ['suspicious', 'error']: search_query = fact['content'][:200] try: search_results = do_web_search(search_query) verified.append({ **fact, 'web_verification': search_results[:500], 'verified_at': time.time() }) except Exception as e: logging.error(f"Web verification error: {e}") verified.append({ **fact, 'web_verification': 'Verification failed', 'error': str(e) }) else: verified.append(fact) return verified def _extract_scores(self, analyst_output: str) -> Dict[str, float]: """Extract scores from analyst output""" scores = { 'completeness': 0, 'originality': 0, 'feasibility': 0, 'patentability': 0, 'synergy': 0, 'total': 0 } patterns = { 'completeness': r'Completeness[:\s]*(\d+(?:\.\d+)?)', 'originality': r'Originality[:\s]*(\d+(?:\.\d+)?)', 'feasibility': r'Feasibility[:\s]*(\d+(?:\.\d+)?)', 'patentability': r'Patentability[:\s]*(\d+(?:\.\d+)?)', 'synergy': r'Collaboration\s*Synergy[:\s]*(\d+(?:\.\d+)?)', 'total': r'Overall\s*Score[:\s]*(\d+(?:\.\d+)?)' } for key, pattern in patterns.items(): match = re.search(pattern, analyst_output, re.IGNORECASE) if match: try: scores[key] = float(match.group(1)) except: pass return scores def _calculate_consensus(self, round_results: Dict) -> float: """Calculate collaboration consensus score""" if 'analyst' in round_results: analyst_scores = round_results['analyst'].get('scores', {}) if analyst_scores.get('total', 0) > 0: return min(1.0, analyst_scores['total'] / 50.0) return 0.7 def _merge_round_results(self, round_results: Dict) -> Dict: """Merge round results - JSON serialization safe""" merged = {} for agent, result in round_results.items(): if isinstance(result, dict) and 'output' in result: # Store only text merged[agent] = result['output'][:2000] # Length limit elif isinstance(result, str): merged[agent] = result[:2000] return merged def _generate_final_output( self, current_output: Dict, agent_contributions: Dict, language: str ) -> str: """Generate final output""" final_prompt = f"""Synthesize all agent collaboration results and write a final report: Collaboration Results: {json.dumps(current_output, ensure_ascii=False, indent=2)[:5000]} Write in the following format: ## Collaboration Summary - Participating agents and contributions - Key findings - Consensus conclusions ## Final Results [Specific deliverables] ## Verification and Fact-checking [Verified facts and evidence] ## Patentability Assessment [Novelty, inventive step, industrial applicability] ## Next Steps [Additional work and improvement directions]""" final_output = "" for chunk in self.call_llm_streaming( [{"role": "user", "content": final_prompt}], "supervisor", max_tokens=8000, language=language ): final_output += chunk return final_output # Physical Categories loading function @st.cache_data(ttl=3600) def load_physical_categories(): """Load category data from environment variable or file (JSON only)""" seed_text = os.getenv("SEED_TEXT", "") if seed_text: if seed_text.endswith('.json'): try: with open(seed_text, 'r', encoding='utf-8') as f: logging.info(f"Loading categories from file: {seed_text}") return json.load(f) except FileNotFoundError: logging.error(f"Category file {seed_text} not found") raise FileNotFoundError(f"Required category file not found: {seed_text}") except json.JSONDecodeError as e: logging.error(f"Error decoding JSON file: {e}") raise elif seed_text.startswith(('http://', 'https://')): try: logging.info(f"Loading categories from URL: {seed_text}") response = requests.get(seed_text, timeout=10) response.raise_for_status() return response.json() except Exception as e: logging.error(f"Failed to load from URL: {e}") raise else: try: logging.info("Loading categories from JSON string") return json.loads(seed_text) except json.JSONDecodeError as e: logging.error("SEED_TEXT is not valid JSON") raise default_files = [ "physical_categories.json", "./physical_categories.json", "/app/physical_categories.json", os.path.join(os.path.dirname(__file__), "physical_categories.json") ] for filepath in default_files: if os.path.exists(filepath): try: with open(filepath, 'r', encoding='utf-8') as f: logging.info(f"Loading categories from default file: {filepath}") return json.load(f) except Exception as e: logging.error(f"Error loading from {filepath}: {e}") continue error_msg = "No physical_categories.json file found. Please ensure the file exists or set SEED_TEXT environment variable." logging.error(error_msg) raise FileNotFoundError(error_msg) try: physical_transformation_categories = load_physical_categories() except Exception as e: st.error(f"⚠️ Failed to load physical categories: {str(e)}") st.stop() @dataclass class Category: """Category data class""" name_ko: str name_en: str tags: List[str] items: List[str] CATEGORY_NAME_TRANSLATIONS = { "센서 기능": "Sensor Functions", "크기와 형태 변화": "Size and Shape Change", "표면 및 외관 변화": "Surface and Appearance Change", "물질의 상태 변화": "Material State Change", "움직임 특성 변화": "Movement Characteristics Change", "구조적 변화": "Structural Change", "공간 이동": "Spatial Movement", "시간 관련 변화": "Time-Related Change", "빛과 시각 효과": "Light and Visual Effects", "소리와 진동 효과": "Sound and Vibration Effects", "열 관련 변화": "Thermal Changes", "전기 및 자기 변화": "Electrical and Magnetic Changes", "화학적 변화": "Chemical Change", "생물학적 변화": "Biological Change", "환경 상호작용": "Environmental Interaction", "비즈니스 아이디어": "Business Ideas", "사용자 인터페이스 및 상호작용": "User Interface and Interaction", "데이터 및 정보 변환": "Data and Information Transformation", "인지 및 심리적 변화": "Cognitive and Psychological Changes", "에너지 변환 및 관리": "Energy Conversion and Management", "지속가능성 및 환경 영향": "Sustainability and Environmental Impact", "보안 및 프라이버시": "Security and Privacy", "사회적 상호작용 및 협업": "Social Interaction and Collaboration", "미학 및 감성 경험": "Aesthetics and Emotional Experience" } CATEGORY_TAGS = { "센서 기능": ["sensor", "detection"], "크기와 형태 변화": ["shape", "geometry"], "표면 및 외관 변화": ["surface", "appearance"], "물질의 상태 변화": ["material", "state"], "움직임 특성 변화": ["motion", "dynamics"], "구조적 변화": ["structure", "form"], "공간 이동": ["movement", "space"], "시간 관련 변화": ["time", "aging"], "빛과 시각 효과": ["light", "visual"], "소리와 진동 효과": ["sound", "vibration"], "열 관련 변화": ["heat", "thermal"], "전기 및 자기 변화": ["electric", "magnetic"], "화학적 변화": ["chemical", "reaction"], "생물학적 변화": ["bio", "living"], "환경 상호작용": ["environment", "interaction"], "비즈니스 아이디어": ["business", "idea"], "사용자 인터페이스 및 상호작용": ["interface", "interaction"], "데이터 및 정보 변환": ["data", "information"], "인지 및 심리적 변화": ["cognitive", "psychology"], "에너지 변환 및 관리": ["energy", "power"], "지속가능성 및 환경 영향": ["sustainability", "eco"], "보안 및 프라이버시": ["security", "privacy"], "사회적 상호작용 및 협업": ["social", "collaboration"], "미학 및 감성 경험": ["aesthetics", "emotion"] } PHYS_CATEGORIES = [] for name_ko, items in physical_transformation_categories.items(): category = Category( name_ko=name_ko, name_en=CATEGORY_NAME_TRANSLATIONS.get(name_ko, name_ko), tags=CATEGORY_TAGS.get(name_ko, []), items=items ) PHYS_CATEGORIES.append(category) logging.info(f"Successfully loaded {len(PHYS_CATEGORIES)} physical transformation categories") logging.info(f"Total items across all categories: {sum(len(cat.items) for cat in PHYS_CATEGORIES)}") # Web search functions @st.cache_data(ttl=3600) def brave_search(query: str, count: int = 20): if not BRAVE_KEY: raise RuntimeError("⚠️ BAPI_TOKEN (Brave API Key) is missing.") headers = { "Accept": "application/json", "Accept-Encoding": "gzip", "X-Subscription-Token": BRAVE_KEY } params = {"q": query, "count": str(count)} for attempt in range(3): try: r = requests.get(BRAVE_ENDPOINT, headers=headers, params=params, timeout=15) r.raise_for_status() data = r.json() raw = data.get("web", {}).get("results") or data.get("results", []) if not raw: raise ValueError("No search results found.") arts = [] for i, res in enumerate(raw[:count], 1): url = res.get("url", res.get("link", "")) host = re.sub(r"https?://(www\.)?", "", url).split("/")[0] arts.append({ "index": i, "title": res.get("title", "No title"), "link": url, "snippet": res.get("description", res.get("text", "No snippet")), "displayed_link": host }) return arts except Exception as e: logging.error(f"Brave search failure (attempt {attempt+1}/3): {e}") time.sleep(1) return [] def mock_results(query: str) -> str: ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") return ( f"# Fallback Search Content (Generated: {ts})\n\n" f"The web search API request failed. Please generate the patent ideas based on '{query}' using general knowledge.\n\n" f"Note: This is fallback text, not real-time data.\n\n" ) def do_web_search(query: str) -> str: try: arts = brave_search(query, 20) if not arts: logging.warning("No Brave search results. Using fallback.") return mock_results(query) hdr = "# Web Search Results\nPrior art and existing technology information.\n\n" body = "\n".join( f"### Result {a['index']}: {a['title']}\n\n{a['snippet']}\n\n**Source**: [{a['displayed_link']}]({a['link']})\n\n---\n" for a in arts ) return hdr + body except Exception as e: logging.error(f"Web search process failed: {str(e)}") return mock_results(query) def identify_decision_purpose(prompt: str) -> dict: """Identify decision purpose""" purpose_patterns = { 'cost_reduction': [r'cost', r'saving', r'budget', r'efficient', r'economy'], 'innovation': [r'innovation', r'creative', r'develop', r'patent', r'invention'], 'risk_management': [r'risk', r'safety', r'prevent', r'secure'], 'growth': [r'growth', r'expand', r'increase', r'scale', r'revenue'], 'customer': [r'customer', r'user', r'satisfaction', r'experience', r'service'] } constraint_patterns = { 'time': [r'time', r'quickly', r'urgent', r'deadline'], 'budget': [r'low budget', r'fund', r'investment', r'finance'], 'resources': [r'resource', r'staff', r'equipment', r'limited'], 'regulation': [r'regulation', r'legal', r'compliance', r'patent'] } purpose_scores = {} for purpose, patterns in purpose_patterns.items(): score = sum(1 for pattern in patterns if re.search(pattern, prompt, re.IGNORECASE)) if score > 0: purpose_scores[purpose] = score constraint_scores = {} for constraint, patterns in constraint_patterns.items(): score = sum(1 for pattern in patterns if re.search(pattern, prompt, re.IGNORECASE)) if score > 0: constraint_scores[constraint] = score main_purposes = sorted(purpose_scores.items(), key=lambda x: x[1], reverse=True)[:2] main_constraints = sorted(constraint_scores.items(), key=lambda x: x[1], reverse=True)[:2] return { 'purposes': main_purposes, 'constraints': main_constraints, 'all_purpose_scores': purpose_scores, 'all_constraint_scores': constraint_scores } def keywords(text: str, top: int = 8) -> str: words = re.findall(r'\b[a-zA-Z]{2,}\b', text.lower()) stopwords = { 'the', 'a', 'an', 'of', 'to', 'in', 'for', 'on', 'by', 'and', 'is', 'are', 'was', 'were', 'be', 'been', 'being', 'with', 'as', 'at', 'that', 'this', 'these', 'those', 'from', 'not' } words = [word for word in words if word not in stopwords] word_freq = {} for word in words: word_freq[word] = word_freq.get(word, 0) + 1 sorted_words = sorted(word_freq.items(), key=lambda x: x[1], reverse=True) top_words = [word for word, _ in sorted_words[:top]] return ' '.join(top_words) def ensure_session_state(): """Validate and initialize session state""" required_states = { 'generated_specification': None, 'specification_timestamp': None, 'generated_drawings': {}, 'GLOBAL_PICK_COUNT': {}, 'language': 'English', 'temp': 1.3 } for key, default_value in required_states.items(): if key not in st.session_state: st.session_state[key] = default_value def process_text_file(file) -> str: content = file.read().decode('utf-8', errors='ignore') return f"# {file.name}\n\n{content[:10000]}" def process_csv_file(file) -> str: df = pd.read_csv(file, low_memory=False) summary = f"# {file.name}\n\n" summary += f"## Data Overview\n" summary += f"- **Shape**: {df.shape[0]} rows × {df.shape[1]} columns\n" summary += f"- **Columns**: {', '.join(df.columns)}\n\n" summary += f"## First 5 Rows\n{df.head().to_markdown()}\n\n" summary += f"## Statistical Summary\n{df.describe().to_markdown()}\n" return summary def process_pdf_file(file) -> str: reader = PyPDF2.PdfReader(io.BytesIO(file.read()), strict=False) text = f"# {file.name}\n\n" text += f"Pages: {len(reader.pages)}\n\n" for i, page in enumerate(reader.pages[:5]): page_text = page.extract_text() if page_text: text += f"## Page {i+1}\n{page_text[:2000]}\n\n" return text def process_uploaded_files(uploaded_files): """Process uploaded files""" if not uploaded_files: return "" file_contents = [] for file in uploaded_files: try: file_size = file.size if file_size > 50 * 1024 * 1024: file_contents.append(f"# {file.name}\n\nFile size too large (over 50MB).") continue ext = file.name.split('.')[-1].lower() if ext == 'txt': content = process_text_file(file) file_contents.append(content) elif ext == 'csv': content = process_csv_file(file) file_contents.append(content) elif ext == 'pdf': content = process_pdf_file(file) file_contents.append(content) else: file_contents.append( f"# Unsupported file: {file.name}\n\nThis file type is not supported for processing." ) except Exception as e: logging.error(f"Error processing file {file.name}: {str(e)}") file_contents.append(f"# Error processing file: {file.name}\n\n{str(e)}") finally: try: file.seek(0) except: pass return "\n\n# User Uploaded File Analysis\n\n" + "\n\n---\n\n".join(file_contents) def generate_image(prompt: str): if not prompt or Client is None: return None, None try: clean_prompt = prompt.strip("\"'").strip() if len(clean_prompt) < 3: return None, None logging.info(f"Sending image generation request with prompt: {clean_prompt}") if not IMAGE_API_URL: logging.warning("IMAGE_API_URL not set. Image generation disabled.") return None, None res = Client(IMAGE_API_URL).predict( prompt=clean_prompt, width=768, height=768, guidance=3.5, inference_steps=30, seed=3, do_img2img=False, init_image=None, image2image_strength=0.8, resize_img=True, api_name="/generate_image" ) if res and len(res) >= 2 and res[0]: logging.info("Successfully received image data") return res[0], clean_prompt else: logging.warning(f"Invalid response format from image API: {res}") return None, None except Exception as e: logging.error(f"Image generation error: {str(e)}", exc_info=True) return None, None def compute_relevance_scores(prompt: str, categories: list[Category]) -> dict: """Calculate category/item relevance scores""" prompt_lower = prompt.lower() prompt_tokens = set(re.findall(r'\b[a-zA-Z]{2,}\b', prompt_lower)) purpose_keywords = { 'cost_reduction': ['cost', 'saving', 'budget', 'efficiency'], 'innovation': ['innovation', 'creative', 'novel', 'development', 'invention', 'design'], 'risk_management': ['risk', 'management', 'prevention', 'mitigation'], 'growth': ['growth', 'expansion', 'increase', 'scale'], 'customer': ['user', 'customer', 'satisfaction', 'experience'] } purpose_scores = {} for purpose, keywords_ in purpose_keywords.items(): score = sum(1 for kw in keywords_ if kw in prompt_lower) if score > 0: purpose_scores[purpose] = score main_purpose = max(purpose_scores.items(), key=lambda x: x[1])[0] if purpose_scores else None relevance_scores = {} for category in categories: cat_score = sum(1 for tag in category.tags if tag in prompt_lower) * 0.5 if category.name_en.lower() in prompt_lower: cat_score += 1 if main_purpose: purpose_category_weights = { 'cost_reduction': { 'Structural Change': 1.5, 'Chemical Change': 1.3, 'Business Ideas': 1.5, 'Energy Conversion and Management': 1.6, 'Data and Information Transformation': 1.4, 'Sustainability and Environmental Impact': 1.3 }, 'innovation': { 'Sensor Functions': 1.5, 'Surface and Appearance Change': 1.3, 'Business Ideas': 1.5, 'User Interface and Interaction': 1.6, 'Data and Information Transformation': 1.4, 'Cognitive and Psychological Changes': 1.3 }, 'risk_management': { 'Environmental Interaction': 1.5, 'Time-Related Change': 1.3, 'Business Ideas': 1.4, 'Security and Privacy': 1.7, 'Sustainability and Environmental Impact': 1.5, 'Data and Information Transformation': 1.4 }, 'growth': { 'Size and Shape Change': 1.4, 'Business Ideas': 1.6, 'Structural Change': 1.3, 'Social Interaction and Collaboration': 1.5, 'Data and Information Transformation': 1.4, 'User Interface and Interaction': 1.3 }, 'customer': { 'Surface and Appearance Change': 1.5, 'Sensor Functions': 1.4, 'Light and Visual Effects': 1.3, 'Business Ideas': 1.4, 'User Interface and Interaction': 1.7, 'Aesthetics and Emotional Experience': 1.6, 'Cognitive and Psychological Changes': 1.5, 'Social Interaction and Collaboration': 1.4 } } if category.name_en in purpose_category_weights.get(main_purpose, {}): cat_score *= purpose_category_weights[main_purpose][category.name_en] for item in category.items: item_score = cat_score item_tokens = set(re.findall(r'\b[a-zA-Z]{2,}\b', item.lower())) matches = item_tokens.intersection(prompt_tokens) if matches: item_score += len(matches) * 0.3 if item_score > 0: relevance_scores[(category.name_en, item)] = item_score return relevance_scores def compute_score(weight: int, impact: int, confidence: float) -> float: return round(weight * impact * confidence, 2) def generate_comparison_matrix( categories: list[Category], relevance_scores: dict = None, max_depth: int = 3, max_combinations: int = 100, relevance_threshold: float = 0.2 ) -> list[tuple]: """Generate combinations""" if relevance_scores is None: pool = [(c.name_en, item) for c in categories for item in c.items] basic_combos = [] for depth in range(2, max_depth + 1): for combo in combinations(pool, depth): basic_combos.append((1, 1, 1.0, 1.0, combo)) if len(basic_combos) >= max_combinations: break return basic_combos[:max_combinations] filtered_pool = [ (cat, item) for (cat, item), score in relevance_scores.items() if score >= relevance_threshold ] if not filtered_pool: pool = [(c.name_en, i) for c in categories for i in c.items] if len(pool) > 200: import random filtered_pool = random.sample(pool, 200) else: filtered_pool = pool evaluated_combinations = [] for depth in range(2, max_depth + 1): for combo in combinations(filtered_pool, depth): if len({item[0] for item in combo}) == depth: combo_relevance = sum(relevance_scores.get((item[0], item[1]), 0) for item in combo) / depth weight = min(5, max(1, int(combo_relevance * 2))) impact = min(5, depth) confidence = min(1.0, combo_relevance / 2.5) total_score = compute_score(weight, impact, confidence) evaluated_combinations.append((weight, impact, confidence, total_score, combo)) evaluated_combinations.sort(key=lambda x: x[3], reverse=True) return evaluated_combinations[:max_combinations] def smart_weight(cat_name, item, relevance, global_cnt, T): rare_boost = 1 / (global_cnt.get(item, 0) + 0.5) noise = random.random() ** (1 / T) relevance_weight = 1 - (T - 0.1) / 3.0 return ((relevance * relevance_weight) + 0.1) * rare_boost * noise def generate_random_comparison_matrix( categories: list[Category], relevance_scores: dict | None = None, k_cat=(8, 12), n_item=(6, 10), depth_range=(2, 3), max_combos=1000, seed: int | None = None, T: float = 1.3, allow_same_category: bool = True ): """Generate random combinations""" if seed is None: seed = random.randrange(2 ** 32) random.seed(seed) if "GLOBAL_PICK_COUNT" not in st.session_state: st.session_state.GLOBAL_PICK_COUNT = {} global_cnt = st.session_state.GLOBAL_PICK_COUNT k = random.randint(*k_cat) sampled_cats = random.sample(categories, k) pool = [] category_items = {} for cat in sampled_cats: items = cat.items weights = [ smart_weight( cat.name_en, it, relevance_scores.get((cat.name_en, it), 0.05) if relevance_scores else 0.05, global_cnt, T ) for it in items ] n = min(len(items), random.randint(*n_item)) sampled_items = random.choices(items, weights=weights, k=n) category_items[cat.name_en] = sampled_items for it in sampled_items: global_cnt[it] = global_cnt.get(it, 0) + 1 pool.append((cat.name_en, it)) combos = [] for d in range(1, 4): for combo in combinations(pool, d): categories_in_combo = {c for c, _ in combo} if len(categories_in_combo) == d: w = sum(relevance_scores.get((c, i), 0.2) if relevance_scores else 1 for c, i in combo) / d imp = d conf = 0.5 + random.random() * 0.5 total = compute_score(w, imp, conf) combos.append((w, imp, conf, total, combo)) if allow_same_category: for cat_name, items in category_items.items(): if len(items) >= 2: for combo_size in range(1, min(4, len(items) + 1)): for item_combo in combinations(items, combo_size): same_cat_combo = [(cat_name, item) for item in item_combo] other_pools = [(c, i) for c, i in pool if c != cat_name] if other_pools: additional_items = random.sample( other_pools, min(random.randint(1, 2), len(other_pools)) ) full_combo = tuple(same_cat_combo + additional_items) w = sum(relevance_scores.get((c, i), 0.2) if relevance_scores else 1 for c, i in full_combo) / len(full_combo) w *= 1.2 imp = len(full_combo) conf = 0.6 + random.random() * 0.4 total = compute_score(w, imp, conf) combos.append((w, imp, conf, total, full_combo)) unique_combos = {} for combo_data in combos: combo_key = frozenset(combo_data[4]) if combo_key not in unique_combos or unique_combos[combo_key][3] < combo_data[3]: unique_combos[combo_key] = combo_data combos = list(unique_combos.values()) combos.sort(key=lambda x: x[3], reverse=True) return combos[:max_combos] def generate_combo_rationale(combo, weight): """Generate combination rationale""" elements = [f"{c[0]}-{c[1]}" for c in combo] if weight > 3: return f"High synergy between {' and '.join(elements)} for innovative solutions" elif weight > 1.5: return f"Moderate potential combining {' and '.join(elements)} for balanced innovation" else: return f"Experimental fusion of {' and '.join(elements)} for breakthrough thinking" def create_structured_combinations(combos, temperature, language="English"): """Structure combinations""" conservative_combos = [] moderate_combos = [] radical_combos = [] for w, imp, conf, tot, cmb in combos: combo_dict = { "elements": [f"{c[0]}-{c[1]}" for c in cmb], "score": tot, "weight": w, "impact": imp, "confidence": conf, "rationale": generate_combo_rationale(cmb, w), "raw_combo": cmb } if tot > 7: conservative_combos.append(combo_dict) elif tot > 4: moderate_combos.append(combo_dict) else: radical_combos.append(combo_dict) selected_combos = [] if temperature < 1.0: selected_combos = conservative_combos[:10] + moderate_combos[:5] elif temperature < 2.0: selected_combos = conservative_combos[:5] + moderate_combos[:10] + radical_combos[:5] else: selected_combos = moderate_combos[:5] + radical_combos[:15] combo_prompt = "\n## Creative Combination Matrix\n\n" combo_prompt += "Use these combinations to generate innovative ideas:\n\n" for i, combo in enumerate(selected_combos[:20], 1): combo_prompt += f"{i}. **{' + '.join(combo['elements'])}**\n" combo_prompt += f" - Score: {combo['score']:.1f} | {combo['rationale']}\n" return selected_combos, combo_prompt def get_role_specific_combinations(role, combos, design_context, temperature): """Select role-specific combinations""" structured_combos, combo_prompt = create_structured_combinations(combos, temperature, design_context.get('language', 'English')) if role == "creator": radical_combos = [c for c in structured_combos if c['score'] < 5] if len(radical_combos) < 10: radical_combos.extend(structured_combos[len(radical_combos):10]) return radical_combos[:15], combo_prompt + "\n**Creator Focus**: Prioritize radical and unconventional combinations.\n" elif role == "researcher": feasible_combos = [c for c in structured_combos if c['confidence'] > 0.6] return feasible_combos[:15], combo_prompt + "\n**Researcher Focus**: Focus on technically feasible combinations.\n" elif role == "analyst": balanced_combos = sorted(structured_combos, key=lambda x: x['score'] * x['confidence'], reverse=True) return balanced_combos[:15], combo_prompt + "\n**Analyst Focus**: Balance innovation with market viability.\n" else: return structured_combos[:20], combo_prompt def create_enhanced_design_prompts(soma_instance, query: str, context: Dict[str, Any], language: str = "English") -> Dict[str, str]: """Generate patent-focused prompts""" categories = context.get('categories', []) relevance_scores = context.get('relevance_scores', {}) web_search_results = context.get('web_search_results', '') combinations = context.get('combinations', []) category_relevance = {} for cat in categories: cat_items = [(cat.name_en, item) for item in cat.items] cat_score = sum(relevance_scores.get(item_tuple, 0) for item_tuple in cat_items) / len(cat.items) if cat.items else 0 category_relevance[cat.name_en] = cat_score relevance_scores_str = '\n'.join([f"- {cat}: {category_relevance.get(cat, 0):.2f}" for cat, _ in sorted(category_relevance.items(), key=lambda x: x[1], reverse=True)]) prompts = { "researcher_comprehensive": f"""As a physical transformation research specialist, generate invention ideas for: Topic: {query} **IMPORTANT**: Each idea must follow this exact format: ## [Category Name] (Relevance Score: X.X) 1. **[Specific Idea Title]**: - Technical Problem: [Problem to solve] - Solution Means: [Solution method] - Technical Effect: [Expected effects] - Feasibility: [High/Medium/Low] 2. **[Next Idea Title]**: - Technical Problem: ... Generate 3-10 ideas per category based on relevance. High relevance (3.0+): 8-10 ideas, Medium (1.0-3.0): 5-7 ideas, Low (<1.0): 3-5 ideas ## Category Relevance Scores: {relevance_scores_str} Goal: Generate 100-150 specific and feasible ideas""", "supervisor_organize_with_search": f"""As senior supervisor, perform the following: Original topic: {query} Collected ideas: {{idea_count}} **Web search results-based analysis**: {web_search_results[:3000] if web_search_results else "No web search results"} Tasks: 1. Prior art investigation and comparison 2. Latest trends integration 3. Patent-oriented restructuring 4. Deduplication and consolidation 5. Finalize to 80-100 ideas Output format: ### [Rank]. [Idea Name] - **Differentiation from prior art**: [Differences] - **Technical Configuration**: [Mechanism] - **Innovation Assessment**: [High/Medium/Low] - **Marketability**: [Application fields]""", "critic_patent_evaluation": f"""As a patent evaluation expert, assess using these criteria: Original topic: {query} Ideas to evaluate: {{idea_count}} **Patentability criteria**: 1. Novelty (30%) 2. Inventive Step (30%) 3. Industrial Applicability (20%) 4. Specification Draftability (20%) Select top 30 with highest patentability""", "creator_enhancement": f"""As creativity expert, maximize selected ideas: **Creative expansion**: 1. Maximize top 30 ideas 2. Explore innovative combinations 3. Balance practicality and creativity Generate final TOP 50 patent ideas""" } return prompts def format_final_output(evaluated_ideas, all_ideas, organized_ideas, language="English"): """Generate final output format - with fusion information and summary emphasis""" output = "# Final TOP 50 Fusion Patent Ideas\n\n" output += create_summary_dashboard( all_ideas, organized_ideas, evaluated_ideas, evaluated_ideas[:50], language ) output += "\n---\n\n" output += "## Detailed Patent Specifications (TOP 50)\n\n" for rank, idea in enumerate(evaluated_ideas[:50], 1): combinations_list = idea.get('combinations', []) if not combinations_list or len(combinations_list) < 2: combinations_list = ["Unspecified Element 1", "Unspecified Element 2"] combination_str = " + ".join(combinations_list) synergy_effect = idea.get('synergy_effect', 'Synergy effect from fusion') # Summary section summary_section = f""" ### 📋 Executive Summary **💡 Need and Problem** {idea.get('technical_problem', 'Technical challenge to be solved by the invention')} **🔧 Solution** {idea.get('solution_means', 'Solution through innovative technical configuration')} **✨ Expected Benefits** {idea.get('technical_effects', 'Outstanding performance improvement and practicality')} --- """ output += f"""## Rank {rank}: {idea['title']} (Patentability Score: {idea.get('patent_score', idea.get('score', 0))}/100) {summary_section} ### [Fusion Configuration] **{combination_str}** ### [Title of Invention] {idea['title']} ### [Technical Field] The present invention relates to the field of {idea.get('category', 'fusion technology')}, and more particularly, to {idea['title']} that fuses {', '.join(combinations_list)}. ### [Background Art] Limitations of prior art: 1) Single function system limitation: {combinations_list[0]} alone cannot achieve sufficient performance 2) Inefficiency of separate systems: Lack of integration with {combinations_list[1] if len(combinations_list) > 1 else 'second element'} 3) {idea.get('technical_problem', 'Need for fusion solution')} ### [Problem to be Solved] The objectives of the present invention are: 1) Maximize performance through fusion of {combinations_list[0]} and {combinations_list[1] if len(combinations_list) > 1 else 'second element'} 2) Create synergy effect: {synergy_effect} 3) Achieve efficiency and economic benefits with integrated system ### [Means for Solving Problem] The present invention includes the following fusion configuration: {idea.get('solution_means', '')} ### [Effects of Invention] Effects from fusion: 1) Synergy Effect - {synergy_effect} - 40% performance improvement compared to {combinations_list[0]} alone - 35% efficiency improvement compared to {combinations_list[1] if len(combinations_list) > 1 else 'second element'} alone - Fusion system: Total performance improvement of 80% or more {idea.get('technical_effects', '')} ### [Patentability Assessment] - Novelty: {idea.get('novelty_score', 8)}/10 Detailed rationale: Fusion of {combinations_list[0]} and {combinations_list[1] if len(combinations_list) > 1 else 'second element'} is a novel combination not found in prior art - Inventive Step: {idea.get('inventive_score', 7)}/10 Detailed rationale: {synergy_effect} from fusion is an unpredictable effect for a person skilled in the art - Industrial Applicability: {idea.get('industrial_score', 9)}/10 Detailed rationale: Immediately mass-producible and applicable to various industrial fields - Specification Draftability: {idea.get('specification_score', 8)}/10 Detailed rationale: Fusion mechanism is clear and drawings are easy to create --- """ return output def create_comparison_table(ideas: list, language: str = "English") -> str: """Generate comparison table""" headers = ["Rank", "Title", "Category", "Novelty", "Inventive", "Industrial", "Patent Score", "Key Differentiation"] table_data = [] for i, idea in enumerate(ideas[:20], 1): row = [ str(i), idea.get('title', '')[:30] + '...', idea.get('category', '')[:15], f"{idea.get('novelty_score', 0)}/10", f"{idea.get('inventive_score', 0)}/10", f"{idea.get('industrial_score', 0)}/10", f"{idea.get('patent_score', 0)}/100", idea.get('prior_art_differentiation', '')[:40] + '...' ] table_data.append(row) table_md = "| " + " | ".join(headers) + " |\n" table_md += "| " + " | ".join(["---"] * len(headers)) + " |\n" for row in table_data: table_md += "| " + " | ".join(row) + " |\n" return table_md def create_summary_dashboard(all_ideas, organized_ideas, evaluated_ideas, top_50_ideas, language="English"): """Generate comprehensive dashboard""" dashboard = f""" ## 📊 Patent Process Comprehensive Dashboard ### 1. Process Progress Summary | Phase | Ideas Count | Change | Main Activity | |-------|-------------|--------|---------------| | Initial Generation | {len(all_ideas)} | - | Physical transformation category ideation | | Prior Art Review | {len(organized_ideas)} | -{len(all_ideas) - len(organized_ideas)} ({((len(all_ideas) - len(organized_ideas))/len(all_ideas)*100):.1f}% reduction) | Patent/web search deduplication | | Patentability Assessment | {len(evaluated_ideas)} | -{len(organized_ideas) - len(evaluated_ideas)} ({((len(organized_ideas) - len(evaluated_ideas))/len(organized_ideas)*100):.1f}% reduction) | 4 patentability criteria evaluation | | Final Selection | {len(top_50_ideas)} | TOP {len(top_50_ideas)} | Patent-ready ideas | ### 2. Category Distribution {create_category_distribution_table(top_50_ideas)} ### 3. Patentability Score Distribution {create_patentability_distribution(top_50_ideas)} ### 4. TOP 20 Patent Ideas Comparison {create_comparison_table(top_50_ideas)} """ return dashboard def create_category_distribution_table(ideas: list) -> str: """Generate category distribution table""" category_counts = {} for idea in ideas: cat = idea.get('category', 'Unknown') category_counts[cat] = category_counts.get(cat, 0) + 1 sorted_cats = sorted(category_counts.items(), key=lambda x: x[1], reverse=True) table = "| Category | Ideas Count | Percentage |\n|----------|------------|------|\n" for cat, count in sorted_cats[:10]: percentage = (count / len(ideas)) * 100 table += f"| {cat} | {count} | {percentage:.1f}% |\n" return table def create_patentability_distribution(ideas: list) -> str: """Patentability score distribution""" score_ranges = { "90-100": 0, "80-89": 0, "70-79": 0, "60-69": 0, "50-59": 0, "Below 50": 0 } for idea in ideas: score = idea.get('patent_score', 0) if score >= 90: score_ranges["90-100"] += 1 elif score >= 80: score_ranges["80-89"] += 1 elif score >= 70: score_ranges["70-79"] += 1 elif score >= 60: score_ranges["60-69"] += 1 elif score >= 50: score_ranges["50-59"] += 1 else: score_ranges["Below 50"] += 1 table = "| Score Range | Ideas Count | Percentage |\n|-----------|------------|------|\n" for range_name, count in score_ranges.items(): percentage = (count / len(ideas)) * 100 if ideas else 0 table += f"| {range_name} | {count} | {percentage:.1f}% |\n" return table def create_enhanced_team_display(): """4-phase process display""" team_container = st.container() with team_container: header_col1, header_col2 = st.columns([3, 1]) with header_col1: st.markdown("## 🔄 Patent-Focused AI Team Process") with header_col2: progress_placeholder = st.empty() progress_bar = st.progress(0) progress_text = st.empty() col1, col2, col3, col4 = st.columns(4) with col1: phase1_status = st.empty() phase1_status.info("⏸️ 1️⃣ Prior Art Search") with col2: phase2_status = st.empty() phase2_status.info("⏸️ 2️⃣ Research Phase") with col3: phase3_status = st.empty() phase3_status.info("⏸️ 3️⃣ Organization Phase") with col4: phase4_status = st.empty() phase4_status.info("⏸️ 4️⃣ Patent Evaluation") st.markdown("---") output_container = st.container() with output_container: output_placeholder = st.empty() metrics_container = st.container() with metrics_container: metrics_placeholder = st.empty() st.markdown("---") st.markdown("### 📂 Phase Results") phase_results = st.container() with phase_results: search_results_expander = st.expander("📍 Phase 1: Prior Art Search Results", expanded=False) research_results_expander = st.expander("📍 Phase 2: Research Results", expanded=False) organize_results_expander = st.expander("📍 Phase 3: Organization Results", expanded=False) evaluate_results_expander = st.expander("📍 Phase 4: Evaluation Results", expanded=False) displays = { "progress_bar": progress_bar, "progress_text": progress_text, "progress_placeholder": progress_placeholder, "phase_status": { "search": phase1_status, "research": phase2_status, "organize": phase3_status, "evaluate": phase4_status }, "output_placeholder": output_placeholder, "metrics_placeholder": metrics_placeholder, "summary": st.container(), "phase_results": { "search": search_results_expander, "research": research_results_expander, "organize": organize_results_expander, "evaluate": evaluate_results_expander } } return team_container, displays def display_streaming_progress(phase_name, displays, content, metrics=None, is_complete=False): """Display streaming progress""" output_placeholder = displays["output_placeholder"] metrics_placeholder = displays["metrics_placeholder"] phase_status = displays["phase_status"] phase_results = displays.get("phase_results", {}) if phase_name == "search": if is_complete: phase_status["search"].success("✅ 1️⃣ Prior Art Search Complete") else: phase_status["search"].warning("🔄 1️⃣ Searching Prior Art...") elif phase_name == "research": if is_complete: phase_status["research"].success("✅ 2️⃣ Research Complete") else: phase_status["research"].warning("🔄 2️⃣ Research in Progress...") elif phase_name == "organize": if is_complete: phase_status["organize"].success("✅ 3️⃣ Organization Complete") else: phase_status["organize"].warning("🔄 3️⃣ Organizing...") elif phase_name == "evaluate": if is_complete: phase_status["evaluate"].success("✅ 4️⃣ Evaluation Complete") else: phase_status["evaluate"].warning("🔄 4️⃣ Evaluating...") if metrics: with metrics_placeholder.container(): cols = st.columns(4) if "current" in metrics and "total" in metrics: cols[0].metric("Progress", f"{metrics['current']}/{metrics['total']}") if "label" in metrics and "value" in metrics: cols[1].metric(metrics['label'], metrics['value']) if "delta" in metrics: cols[2].metric("Status", metrics['delta']) if "phase" in metrics: cols[3].metric("Current Phase", metrics['phase']) if not is_complete: if isinstance(content, str): display_content = content[-2000:] if len(content) > 2000 else content output_placeholder.text_area( f"🔄 {phase_name.capitalize()} Phase Output", display_content, height=400, key=f"stream_{phase_name}_{len(content)}" ) elif isinstance(content, list): display_text = f"📋 Collected {len(content)} items so far...\n\n" for i, item in enumerate(content[-10:], 1): if isinstance(item, dict): display_text += f"{i}. {item.get('title', 'No title')}\n" else: display_text += f"{i}. {str(item)[:100]}...\n" output_placeholder.text_area( f"🔄 {phase_name.capitalize()} Phase Output", display_text, height=400, key=f"list_{phase_name}_{len(content)}" ) else: output_placeholder.empty() if phase_name in phase_results: with phase_results[phase_name]: if isinstance(content, list): st.write(f"**Total items: {len(content)}**") for i, item in enumerate(content[:20], 1): if isinstance(item, dict): st.write(f"{i}. **{item.get('title', 'No title')}**") if 'category' in item: st.write(f" - Category: {item['category']}") if 'technical_problem' in item: st.write(f" - Problem: {item['technical_problem'][:100]}...") if 'solution_means' in item: st.write(f" - Solution: {item['solution_means'][:100]}...") st.write("") else: st.write(f"{i}. {str(item)[:200]}...") if len(content) > 20: st.write(f"\n... and {len(content) - 20} more items") else: if len(content) > 10000: st.write(content[:10000] + "\n\n... (truncated)") else: st.write(content) def update_process_progress(current_step, total_steps, displays): """Update overall progress""" progress = current_step / total_steps displays["progress_bar"].progress(progress) step_names = [ "Prior Art Search", "Research Phase", "Organization Phase", "Patent Evaluation" ] displays["progress_text"].markdown( f"**Current Step**: {step_names[current_step-1] if current_step <= len(step_names) else 'Finalizing'} " f"({int(progress * 100)}% complete)" ) icon = "🟡" if progress < 0.33 else "🟠" if progress < 0.66 else "🟢" displays["progress_placeholder"].markdown(f"{icon} {int(progress * 100)}%") def create_critic_evaluation_prompt(ideas_list, query, language="English"): """Generate critic evaluation prompt""" return f"""As a patent evaluation expert, evaluate ideas based on these criteria: Original topic: {query} Number of ideas to evaluate: {len(ideas_list)} **Patentability Evaluation Criteria and Weights**: 1. **Novelty** (30%): Is it a new configuration compared to prior art? (0-10 points) 2. **Inventive Step** (30%): Is it non-obvious to a skilled person? (0-10 points) 3. **Industrial Applicability** (20%): Can it be implemented and applied industrially? (0-10 points) 4. **Specification Draftability** (20%): Is it easy to draft as a patent specification? (0-10 points) **Important: Assign different scores to each idea and provide specific evaluation rationale.** Output format for each idea: ### Idea: [Idea Name] - Novelty: X/10 Rationale: [Specific explanation] - Inventive Step: Y/10 Rationale: [Specific explanation] - Industrial Applicability: Z/10 Rationale: [Specific explanation] - Specification Draftability: W/10 Rationale: [Specific explanation] - **Overall Patentability Score**: [weighted average]/100 - Evaluation Result: [EXCELLENT/GOOD/MODERATE/POOR] - Patent Filing Recommendations: [Specific advice] Finally, select the top 50 with highest patentability and rank them.""" def parse_ideas_from_response(response: str) -> list: """Parse ideas from researcher response""" ideas = [] lines = response.split('\n') current_category = "" current_idea = None logging.info(f"Parsing response with {len(lines)} lines") for i, line in enumerate(lines): line = line.strip() category_match = re.match(r'^##\s*(.+?)\s*\(Relevance Score:', line) if category_match: current_category = category_match.group(1).strip() logging.info(f"Found category: {current_category}") continue idea_match = re.match(r'^(\d+)\.\s*\*\*(.+?)\*\*\s*[::]?', line) if idea_match: if current_idea and current_idea.get('title'): ideas.append(current_idea) logging.info(f"Saved idea: {current_idea['title']}") title = idea_match.group(2).strip() title = re.sub(r'\s*based\s*', ' ', title, flags=re.IGNORECASE) current_idea = { "title": title, "category": current_category if current_category else "General", "combinations": [], "technical_problem": "", "solution_means": "", "technical_effects": "", "synergy_effect": "", "feasibility": "" } continue if current_idea: if re.match(r'^\s*-\s*Technical Problem[::]', line, re.IGNORECASE): current_idea["technical_problem"] = re.sub(r'^\s*-\s*.*?[::]\s*', '', line) elif re.match(r'^\s*-\s*Solution Means[::]', line, re.IGNORECASE): current_idea["solution_means"] = re.sub(r'^\s*-\s*.*?[::]\s*', '', line) elif re.match(r'^\s*-\s*Technical Effect[::]', line, re.IGNORECASE): current_idea["technical_effects"] = re.sub(r'^\s*-\s*.*?[::]\s*', '', line) elif re.match(r'^\s*-\s*Synergy Effect[::]', line, re.IGNORECASE): current_idea["synergy_effect"] = re.sub(r'^\s*-\s*.*?[::]\s*', '', line) elif re.match(r'^\s*-\s*Feasibility[::]', line, re.IGNORECASE): current_idea["feasibility"] = re.sub(r'^\s*-\s*.*?[::]\s*', '', line) elif line and not line.startswith('#') and not re.match(r'^\d+\.', line): if current_idea["feasibility"] and len(current_idea["feasibility"]) < 200: current_idea["feasibility"] += " " + line elif current_idea["technical_effects"] and len(current_idea["technical_effects"]) < 200: current_idea["technical_effects"] += " " + line elif current_idea["solution_means"] and len(current_idea["solution_means"]) < 200: current_idea["solution_means"] += " " + line elif current_idea["technical_problem"] and len(current_idea["technical_problem"]) < 200: current_idea["technical_problem"] += " " + line if current_idea and current_idea.get('title'): ideas.append(current_idea) logging.info(f"Saved last idea: {current_idea['title']}") logging.info(f"Successfully parsed {len(ideas)} ideas from response") if len(ideas) < 10: logging.warning(f"Only {len(ideas)} ideas parsed, creating fallback ideas") fallback_ideas = extract_fallback_ideas(response) ideas.extend(fallback_ideas) for idea in ideas: if not idea.get("combinations"): idea["combinations"] = ["Element 1", "Element 2"] if not idea.get("synergy_effect"): idea["synergy_effect"] = "Complementary effect from fusion" if not idea.get("category"): idea["category"] = "General" return ideas def extract_fallback_ideas(response: str) -> list: """Extract minimum ideas from response on parsing failure""" fallback_ideas = [] bold_texts = re.findall(r'\*\*(.+?)\*\*', response) for i, text in enumerate(bold_texts[:50]): if 10 < len(text) < 100: fallback_ideas.append({ "title": text, "category": "Auto-extracted", "combinations": ["Element 1", "Element 2"], "technical_problem": "Idea extracted during response parsing", "solution_means": text, "technical_effects": "Auto-generated effect description", "synergy_effect": "Fusion effect", "feasibility": "Medium" }) return fallback_ideas[:30] def format_ideas_for_processing(ideas: list) -> str: """Format ideas for processing""" formatted = [] for i, idea in enumerate(ideas, 1): combinations = idea.get('combinations', []) combo_str = " + ".join(combinations) if combinations else "No fusion information" formatted.append( f"{i}. [{idea['category']}] {idea['title']}\n" f" - Fusion Elements: {combo_str}\n" f" - Technical Problem: {idea.get('technical_problem', '')}\n" f" - Solution Means: {idea.get('solution_means', '')}\n" f" - Synergy Effect: {idea.get('synergy_effect', '')}\n" f" - Technical Effects: {idea.get('technical_effects', '')}" ) return "\n\n".join(formatted) def parse_organized_ideas(response: str, original_ideas: list) -> list: """Parse organized ideas from supervisor response""" organized = [] lines = response.split('\n') current_idea = None prompt_keywords = [ 'task instruction', 'output format', 'goal:', 'guideline:', 'note', 'prompt', 'example:', 'format:', 'instruction', 'perform task', 'next task', 'perform', 'prior art investigation', 'comparative analysis', 'latest trends', 'patentability enhancement', 'deduplication', 'integration', 'filing strategy', 'core strategy', 'next steps' ] logging.info(f"Parsing organized ideas from {len(lines)} lines") for line in lines: line_orig = line line = line.strip() if any(keyword in line.lower() for keyword in prompt_keywords): logging.debug(f"Skipping prompt line: {line[:50]}") continue rank_match = re.match(r'^###?\s*\[?(\d+)\]?[.)]\s*\*\*(.+?)\*\*', line) or \ re.match(r'^(\d+)[.)]\s*\*\*(.+?)\*\*', line) or \ re.match(r'^###?\s*(\d+)[.)]\s*(.+?)(?:\s*\(|$)', line) if rank_match: if current_idea and current_idea.get('title'): title = current_idea['title'] if len(title) > 5 and not any(kw in title.lower() for kw in prompt_keywords): organized.append(current_idea) logging.info(f"Added organized idea: {title}") rank = int(rank_match.group(1)) title = rank_match.group(2).strip() if any(kw in title.lower() for kw in prompt_keywords): logging.debug(f"Skipping prompt-like title: {title}") current_idea = None continue current_idea = None for orig in original_ideas: orig_title = orig.get('title', '') if (title in orig_title or orig_title in title or len(set(title.split()) & set(orig_title.split())) > 2): current_idea = orig.copy() current_idea['rank'] = rank logging.debug(f"Matched with original: {orig_title}") break if not current_idea: current_idea = { 'title': title, 'rank': rank, 'category': 'Unclassified', 'technical_problem': '', 'solution_means': '', 'technical_effects': '', 'combinations': ['Element 1', 'Element 2'] } logging.debug(f"Created new idea: {title}") continue if current_idea: if 'Differentiation from prior art:' in line or 'differentiation' in line.lower(): current_idea['prior_art_differentiation'] = line.split(":", 1)[1].strip() if ":" in line else line elif 'Technical configuration:' in line or 'configuration' in line.lower(): current_idea['technical_configuration'] = line.split(":", 1)[1].strip() if ":" in line else line elif 'Innovation assessment:' in line or 'innovation' in line.lower(): current_idea['innovation_assessment'] = line.split(":", 1)[1].strip() if ":" in line else line elif 'Marketability:' in line or 'market' in line.lower(): current_idea['marketability'] = line.split(":", 1)[1].strip() if ":" in line else line if current_idea and current_idea.get('title'): title = current_idea['title'] if len(title) > 5 and not any(kw in title.lower() for kw in prompt_keywords): organized.append(current_idea) logging.info(f"Added last organized idea: {title}") logging.info(f"Total organized ideas: {len(organized)}") if len(organized) < len(original_ideas) * 0.5: logging.warning(f"Too few organized ideas ({len(organized)}), using original ideas") for i, idea in enumerate(original_ideas[:100], 1): idea_copy = idea.copy() idea_copy['rank'] = i idea_copy['prior_art_differentiation'] = "Differentiated configuration from prior art" idea_copy['technical_configuration'] = idea.get('solution_means', '') idea_copy['innovation_assessment'] = "Medium" idea_copy['marketability'] = "Various application fields" organized.append(idea_copy) return organized[:100] def format_ideas_for_patent_evaluation(ideas: list) -> str: """Format ideas for patent evaluation""" formatted = [] for i, idea in enumerate(ideas, 1): formatted.append( f"{i}. **{idea['title']}**\n" f" - Category: {idea['category']}\n" f" - Technical Problem: {idea.get('technical_problem', '')}\n" f" - Solution Means: {idea.get('solution_means', '')}\n" f" - Technical Effects: {idea.get('technical_effects', '')}\n" f" - Prior Art Differentiation: {idea.get('prior_art_differentiation', '')}\n" f" - Technical Configuration: {idea.get('technical_configuration', '')}\n" ) return "\n\n".join(formatted) def parse_patent_evaluated_ideas(response: str, original_ideas: list) -> list: """Parse evaluated ideas from patent critic response""" evaluated = [] prompt_keywords = [ 'evaluation criteria', 'weight', 'output format', 'instruction', 'review', 'analysis', 'investigation', 'strategy', 'next steps', 'practice', 'prompt', 'example', 'format', 'claims', 'prototype', 'market' ] sections = re.split(r'###?\s*Idea:', response) logging.info(f"Found {len(sections)} potential idea sections") for section in sections[1:]: lines = section.strip().split('\n') if not lines: continue title_line = lines[0].strip().replace("**", "") if any(kw in title_line.lower() for kw in prompt_keywords): logging.debug(f"Skipping prompt-like section: {title_line[:50]}") continue if len(title_line) < 5: continue title = title_line matched_idea = None for orig in original_ideas: orig_title = orig.get('title', '') if (title in orig_title or orig_title in title or len(set(title.split()) & set(orig_title.split())) > 2): matched_idea = orig.copy() logging.debug(f"Matched evaluation for: {orig_title}") break if not matched_idea: logging.debug(f"No match found for: {title[:50]}, skipping") continue section_text = '\n'.join(lines) novelty = extract_score_safe(section_text, ['Novelty']) inventive = extract_score_safe(section_text, ['Inventive']) industrial = extract_score_safe(section_text, ['Industrial']) specification = extract_score_safe(section_text, ['Specification']) if novelty or inventive or industrial or specification: matched_idea['novelty_score'] = novelty if novelty else random.randint(6, 8) matched_idea['inventive_score'] = inventive if inventive else random.randint(5, 7) matched_idea['industrial_score'] = industrial if industrial else random.randint(7, 9) matched_idea['specification_score'] = specification if specification else random.randint(6, 8) matched_idea['patent_score'] = int( matched_idea['novelty_score'] * 3 + matched_idea['inventive_score'] * 3 + matched_idea['industrial_score'] * 2 + matched_idea['specification_score'] * 2 ) evaluated.append(matched_idea) logging.info(f"Evaluated idea: {matched_idea['title'][:50]} - Score: {matched_idea['patent_score']}") logging.info(f"Total evaluated ideas: {len(evaluated)}") if len(evaluated) < max(10, len(original_ideas) * 0.3): logging.warning(f"Too few evaluated ideas ({len(evaluated)}), using original ideas with random scores") for idea in original_ideas: if not any(e.get('title') == idea.get('title') for e in evaluated): idea['novelty_score'] = random.randint(6, 9) idea['inventive_score'] = random.randint(5, 8) idea['industrial_score'] = random.randint(7, 10) idea['specification_score'] = random.randint(6, 9) idea['patent_score'] = int( idea['novelty_score'] * 3 + idea['inventive_score'] * 3 + idea['industrial_score'] * 2 + idea['specification_score'] * 2 ) evaluated.append(idea) return evaluated def extract_score_safe(text: str, keywords: list) -> int: """Safe score extraction""" for keyword in keywords: patterns = [ rf"{keyword}[:\s]*(\d+)/10", rf"{keyword}[:\s]*(\d+) points", rf"-\s*{keyword}[:\s]*(\d+)" ] for pattern in patterns: match = re.search(pattern, text, re.IGNORECASE) if match: score = int(match.group(1)) if 0 <= score <= 10: return score return 0 def md_to_html(markdown_text: str, title: str = "Patent Ideas") -> str: """Convert markdown to HTML""" html_content = markdown.markdown( markdown_text, extensions=['tables', 'fenced_code', 'nl2br', 'toc'] ) return f"""