#!/usr/bin/env python3 # -*- coding: utf-8 -*- import streamlit as st import pandas as pd import sys import os import json import re import logging import requests import markdown import time import io import random import hashlib from datetime import datetime from typing import Iterator, List, Dict, Any, Generator, Tuple from itertools import combinations import urllib.parse from dataclasses import dataclass import PyPDF2 from collections import Counter from docx import Document from docx.shared import Pt, Mm from docx.enum.text import WD_ALIGN_PARAGRAPH import numpy as np import matplotlib.pyplot as plt import matplotlib.patches as patches from matplotlib.patches import FancyBboxPatch, Rectangle, Circle, Arrow, Polygon import matplotlib.font_manager as fm import graphviz from tempfile import NamedTemporaryFile import base64 # Gradio Client for image generation try: from gradio_client import Client except ImportError: Client = None logging.warning("gradio_client not installed. Image generation will be disabled.") # Friendli AI imports try: from openai import OpenAI, APIError, APITimeoutError except ImportError: logging.warning("openai package not installed.") OpenAI = None APIError = Exception APITimeoutError = Exception import tempfile import glob import shutil # Additional libraries try: import pyarrow.parquet as pq except ImportError: logging.warning("pyarrow not installed. Parquet file reading will be disabled.") pq = None try: from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity SKLEARN_AVAILABLE = True except ImportError: logging.warning("scikit-learn not installed. Some features will be disabled.") TfidfVectorizer = None cosine_similarity = None SKLEARN_AVAILABLE = False # Network stability libraries try: import httpx from httpx import RemoteProtocolError except ImportError: logging.warning("httpx not installed.") httpx = None RemoteProtocolError = Exception # Backoff fallback try: import backoff except ImportError: logging.warning("`backoff` module is missing. Using a simple fallback decorator.") def _simple_backoff_on_exception(exceptions, *args, **kwargs): max_tries = kwargs.get("max_tries", 3) base = kwargs.get("base", 2) def decorator(fn): def wrapper(*f_args, **f_kwargs): attempt = 0 while True: try: return fn(*f_args, **f_kwargs) except exceptions as e: attempt += 1 if attempt >= max_tries: raise sleep = base ** attempt logging.info(f"[retry {attempt}/{max_tries}] {fn.__name__} -> {e} … waiting {sleep}s") time.sleep(sleep) return wrapper return decorator class _DummyBackoff: on_exception = _simple_backoff_on_exception backoff = _DummyBackoff() # Streamlit page configuration st.set_page_config( page_title="Ilúvatar: Patent-Focused Creative Design & Invention AI", layout="wide", initial_sidebar_state="expanded" ) # Environment Variables FIREWORKS_API_KEY = os.getenv("FIREWORKS_API_KEY", "") FIREWORKS_API_URL = os.getenv("FIREWORKS_API_URL", "") FIREWORKS_MODEL = os.getenv("FIREWORKS_MODEL", "") BRAVE_KEY = os.getenv("BAPI_TOKEN", "") SERPHOUSE_KEY = os.getenv("SERPHOUSE_API_KEY", "") IMAGE_API_URL = os.getenv("IMAGE_API_URL") MAX_TOKENS = 7999 BRAVE_ENDPOINT = "https://api.search.brave.com/res/v1/web/search" # Logging setup LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO") logging.basicConfig( level=getattr(logging, LOG_LEVEL.upper(), logging.INFO), format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) # SOMA System Integration class SOMASystem: """Self-Orchestrating Modular Architect System""" def __init__(self, api_key: str): self.api_key = api_key self.api_url = FIREWORKS_API_URL self.model = FIREWORKS_MODEL self.test_mode = not api_key def create_headers(self): return { "Accept": "application/json", "Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}" } def call_llm_streaming(self, messages: List[Dict[str, str]], role: str, max_tokens: int = 4096, language: str = "English") -> Generator[str, None, None]: """Streaming LLM API call using Fireworks AI""" if self.test_mode: test_response = f"[{role.upper()}] This is a test response for {role} AI." yield from self.simulate_streaming(test_response, role) return try: system_prompts = self.get_system_prompts(language) full_messages = [ {"role": "system", "content": system_prompts.get(role, "")}, *messages ] payload = { "model": self.model, "messages": full_messages, "max_tokens": max_tokens, "top_p": 1, "top_k": 40, "presence_penalty": 0, "frequency_penalty": 0, "temperature": 0.6, "stream": True } logging.info(f"Making API request to {self.api_url} for role {role}") response = requests.post( self.api_url, headers=self.create_headers(), data=json.dumps(payload), stream=True, timeout=60 ) if response.status_code != 200: error_msg = f"❌ API error ({response.status_code}): {response.text[:200]}" logging.error(error_msg) yield error_msg return buffer = "" chunk_count = 0 for line in response.iter_lines(): if line: line = line.decode('utf-8') if line.startswith("data: "): data = line[6:] if data == "[DONE]": if buffer: yield buffer break try: chunk = json.loads(data) if "choices" in chunk and chunk["choices"]: content = chunk["choices"][0].get("delta", {}).get("content", "") if content: buffer += content chunk_count += 1 if len(buffer) > 20 or '\n' in buffer or chunk_count % 5 == 0: yield buffer buffer = "" except json.JSONDecodeError as e: logging.warning(f"JSON decode error: {e} for line: {line[:100]}") continue if buffer: yield buffer except requests.exceptions.Timeout: error_msg = "❌ Request timeout - API response taking too long" logging.error(error_msg) yield error_msg except Exception as e: error_msg = f"❌ Error occurred: {str(e)}" logging.error(f"Error during streaming: {str(e)}", exc_info=True) yield error_msg def simulate_streaming(self, text: str, role: str) -> Generator[str, None, None]: """Simulate streaming in test mode""" words = text.split() chunk_size = 5 for i in range(0, len(words), chunk_size): chunk = " ".join(words[i:i+chunk_size]) yield chunk + " " time.sleep(0.02) def get_system_prompts(self, language: str = "English") -> Dict[str, str]: """Get system prompts for each AI role""" return { "supervisor": "You are a senior researcher directing inventions from a patent strategy perspective. You provide strategic guidance for prior art research and patentability enhancement.", "critic": "You are a patent expert evaluating inventions from a patent examiner's perspective. You strictly assess novelty, inventive step, and industrial applicability, and review patent specification draftability.", "creator": "You are an inventor creating patentable innovative inventions. You present creative and feasible solutions that overcome limitations of existing technology.", "researcher": "You are a professional researcher investigating patent information and prior art. You systematically analyze related patents, technology trends, and market needs.", "analyst": "You are a senior analyst building patent portfolios. You comprehensively evaluate the patentability, marketability, and technical value of inventions." } # ============================================================================ # ENHANCED SOMA COLLABORATION SYSTEM # ============================================================================ @dataclass class AgentMessage: """Agent communication message structure""" from_agent: str to_agent: str message_type: str content: str context: Dict[str, Any] timestamp: float @dataclass class CollaborationResult: """Collaboration result""" final_output: str collaboration_rounds: int agent_contributions: Dict[str, List[str]] fact_checks: List[Dict] consensus_score: float class EnhancedSOMASystem(SOMASystem): """Enhanced SOMA System - Autonomous collaboration and fact-checking""" def __init__(self, api_key: str): super().__init__(api_key) self.message_queue: List[AgentMessage] = [] self.collaboration_history: List[Dict] = [] self.fact_check_cache: Dict[str, bool] = {} def autonomous_collaboration( self, task: str, initial_data: Dict, max_rounds: int = 3, min_consensus: float = 0.8, language: str = "English" ) -> CollaborationResult: """Autonomous multi-agent collaboration""" collaboration_rounds = 0 agent_contributions = { "researcher": [], "supervisor": [], "critic": [], "creator": [], "analyst": [] } fact_checks = [] current_output = initial_data consensus_score = 0.0 for round_num in range(max_rounds): collaboration_rounds += 1 logging.info(f"Collaboration Round {round_num + 1}/{max_rounds}") round_results = {} # Researcher researcher_output = self._agent_work_with_questions( "researcher", task, current_output, language ) round_results["researcher"] = researcher_output agent_contributions["researcher"].append(researcher_output["output"]) # Creator creator_output = self._agent_work_with_questions( "creator", task, {**current_output, "research": researcher_output["output"]}, language ) round_results["creator"] = creator_output agent_contributions["creator"].append(creator_output["output"]) # Critic critic_output = self._critical_review_with_factcheck( task, {**current_output, "research": researcher_output["output"], "ideas": creator_output["output"]}, language ) round_results["critic"] = critic_output agent_contributions["critic"].append(critic_output["output"]) fact_checks.extend(critic_output.get("fact_checks", [])) # Supervisor supervisor_output = self._supervisor_synthesis( task, round_results, language ) round_results["supervisor"] = supervisor_output agent_contributions["supervisor"].append(supervisor_output["output"]) # Analyst analyst_output = self._analyst_evaluation( task, round_results, language ) round_results["analyst"] = analyst_output agent_contributions["analyst"].append(analyst_output["output"]) # Calculate consensus score consensus_score = self._calculate_consensus(round_results) logging.info(f"Round {round_num + 1} Consensus: {consensus_score:.2f}") if consensus_score >= min_consensus: logging.info(f"Consensus reached at round {round_num + 1}") break current_output = self._merge_round_results(round_results) final_output = self._generate_final_output( current_output, agent_contributions, language ) return CollaborationResult( final_output=final_output, collaboration_rounds=collaboration_rounds, agent_contributions=agent_contributions, fact_checks=fact_checks, consensus_score=consensus_score ) def _agent_work_with_questions( self, agent_role: str, task: str, context: Dict, language: str ) -> Dict: """Agent performs work including asking questions""" prompt = self._create_collaborative_prompt( agent_role, task, context, language ) output = "" questions_for_others = [] for chunk in self.call_llm_streaming( [{"role": "user", "content": prompt}], agent_role, max_tokens=4096, language=language ): output += chunk questions_for_others = self._extract_questions(output) answers = {} if questions_for_others: answers = self._get_answers_from_agents( agent_role, questions_for_others, context, language ) return { "output": output, "questions": questions_for_others, "answers_received": answers } def _critical_review_with_factcheck( self, task: str, context: Dict, language: str ) -> Dict: """Critical review with fact-checking""" prompt = f"""As a critical expert, rigorously review the following content and perform fact-checks: Task: {task} Provided Information: {json.dumps(context, ensure_ascii=False, indent=2)[:3000]} Please verify the following: 1. **Fact Check**: - Is there evidence for all claims? - Are statistics or numbers accurate? - Are cited sources credible? 2. **Logical Consistency**: - Are claims logically connected to evidence? - Are there any contradictions? 3. **Completeness**: - Is any important information missing? - Have alternative views been considered? 4. **Patentability Perspective**: - Is there sufficient evidence for novelty? - Are inventive step claims valid? For each item: - ✅ Verified: [evidence] - ⚠️ Questionable: [reason] - ❌ Error: [issue] Provide your assessment in this format.""" output = "" for chunk in self.call_llm_streaming( [{"role": "user", "content": prompt}], "critic", max_tokens=5000, language=language ): output += chunk fact_checks = self._parse_fact_checks(output) verified_facts = self._verify_with_web_search(fact_checks, context) return { "output": output, "fact_checks": fact_checks, "verified_facts": verified_facts } def _supervisor_synthesis( self, task: str, round_results: Dict, language: str ) -> Dict: """Supervisor synthesizes all agent results""" prompt = f"""As a supervisor, synthesize the following agent work results: Task: {task} Agent Results: {json.dumps({k: v.get('output', '')[:500] for k, v in round_results.items()}, ensure_ascii=False, indent=2)} Please perform the following: 1. **Integrate Strengths**: Identify and integrate excellent contributions from each agent 2. **Address Weaknesses**: Supplement deficiencies with other agents' results 3. **Resolve Conflicts**: Reconcile conflicting opinions between agents 4. **Set Priorities**: Select the most important ideas/improvements 5. **Suggest Next Steps**: Propose additional work needed Organize the synthesis results systematically.""" output = "" for chunk in self.call_llm_streaming( [{"role": "user", "content": prompt}], "supervisor", max_tokens=5000, language=language ): output += chunk return {"output": output} def _analyst_evaluation( self, task: str, round_results: Dict, language: str ) -> Dict: """Analyst evaluates collaboration quality""" prompt = f"""As an analysis expert, evaluate the quality of this collaboration round: Task: {task} Agent Results: {json.dumps({k: v.get('output', '')[:500] for k, v in round_results.items()}, ensure_ascii=False, indent=2)} Evaluate based on the following criteria (each out of 10 points): 1. **Completeness**: Degree of task goal achievement 2. **Originality**: New and innovative ideas 3. **Feasibility**: Actual implementation possibility 4. **Patentability**: Patent filing possibility 5. **Collaboration Synergy**: Effectiveness of inter-agent cooperation Provide the score and detailed rationale for each item, **Overall Score (out of 50)** and **Improvement directions for next round**.""" output = "" for chunk in self.call_llm_streaming( [{"role": "user", "content": prompt}], "analyst", max_tokens=4096, language=language ): output += chunk scores = self._extract_scores(output) return { "output": output, "scores": scores } def _create_collaborative_prompt( self, agent_role: str, task: str, context: Dict, language: str ) -> str: """Generate prompt for collaboration""" # Convert Category objects to JSON serializable format serializable_context = {} for key, value in context.items(): if key == 'categories': # Convert Category list to dictionary list serializable_context[key] = [ { 'name_en': cat.name_en, 'tags': cat.tags, 'items': cat.items[:5] # Only first 5 items } for cat in value[:10] # Only first 10 categories ] elif key == 'combinations': # Convert combinations to simple string list serializable_context[key] = [ f"{' + '.join([f'{c[0]}-{c[1]}' for c in combo[4]])}" for combo in value[:10] ] elif isinstance(value, (str, int, float, bool, type(None))): serializable_context[key] = value elif isinstance(value, dict): serializable_context[key] = str(value)[:500] else: serializable_context[key] = str(value)[:500] context_str = json.dumps(serializable_context, ensure_ascii=False, indent=2)[:2000] base_prompts = { "researcher": f"""As a research expert, perform the following task: Task: {task} Provided Context: {context_str} Actions: 1. Research related prior art and data 2. Summarize key findings 3. **Questions for other agents**: Specify information needed or items requiring verification in [QUESTION FOR {{role}}] format Example: [QUESTION FOR CREATOR] How can this technology be creatively utilized?""", "creator": f"""As a creative expert, perform the following task: Task: {task} Provided Context: {context_str} Actions: 1. Generate innovative ideas 2. Highlight differences from existing technology 3. **Questions for other agents**: Specify areas needing verification or additional information in [QUESTION FOR {{role}}] format Example: [QUESTION FOR CRITIC] Are there any patentability issues with this idea?""", "supervisor": f"""As a supervisor, perform the following task: Task: {task} Provided Context: {context_str} Actions: 1. Coordinate overall process 2. Coordinate between agents 3. Set priorities 4. **Instructions to other agents**: Specify additional work or improvements needed in [REQUEST TO {{role}}] format""", "analyst": f"""As an analysis expert, perform the following task: Task: {task} Provided Context: {context_str} Actions: 1. Analyze current results 2. Evaluate strengths and weaknesses 3. Suggest improvement directions 4. **Questions for other agents**: Additional information needed for evaluation in [QUESTION FOR {{role}}] format""" } return base_prompts.get(agent_role, "") def _extract_questions(self, output: str) -> List[Tuple[str, str]]: """Extract questions from output""" questions = [] pattern = r'\[QUESTION FOR ([A-Z]+)\](.*?)(?=\[|$)' matches = re.finditer(pattern, output, re.DOTALL | re.IGNORECASE) for match in matches: role = match.group(1).lower() question = match.group(2).strip() questions.append((role, question)) return questions def _get_answers_from_agents( self, asking_agent: str, questions: List[Tuple[str, str]], context: Dict, language: str ) -> Dict[str, str]: """Get answers from other agents""" answers = {} # Make context JSON serializable serializable_context = {} for key, value in context.items(): if key == 'categories': # Convert Category list to dictionary list if isinstance(value, list) and len(value) > 0: serializable_context[key] = [ { 'name_en': cat.name_en if hasattr(cat, 'name_en') else str(cat), 'tags': cat.tags if hasattr(cat, 'tags') else [], 'items': cat.items[:5] if hasattr(cat, 'items') else [] } for cat in value[:10] ] else: serializable_context[key] = [] elif key == 'combinations': # Convert combinations to simple string list if isinstance(value, list): serializable_context[key] = [ f"{' + '.join([f'{c[0]}-{c[1]}' for c in combo[4]])}" for combo in value[:10] if len(combo) > 4 ] else: serializable_context[key] = [] elif isinstance(value, (str, int, float, bool, type(None))): serializable_context[key] = value elif isinstance(value, dict): serializable_context[key] = str(value)[:500] else: serializable_context[key] = str(value)[:500] for target_role, question in questions: if target_role in ["researcher", "creator", "critic", "supervisor", "analyst"]: prompt = f"""{asking_agent.upper()} agent's question - please answer: Question: {question} Context: {json.dumps(serializable_context, ensure_ascii=False, indent=2)[:1500]} Provide a clear and concise answer.""" answer = "" for chunk in self.call_llm_streaming( [{"role": "user", "content": prompt}], target_role, max_tokens=2048, language=language ): answer += chunk answers[target_role] = answer return answers def _parse_fact_checks(self, critic_output: str) -> List[Dict]: """Parse fact-check results from critic output""" fact_checks = [] patterns = { 'verified': r'✅\s*Verified:(.+?)(?=\n[✅⚠️❌]|\Z)', 'suspicious': r'⚠️\s*Questionable:(.+?)(?=\n[✅⚠️❌]|\Z)', 'error': r'❌\s*Error:(.+?)(?=\n[✅⚠️❌]|\Z)' } for status, pattern in patterns.items(): matches = re.finditer(pattern, critic_output, re.DOTALL) for match in matches: content = match.group(1).strip() fact_checks.append({ 'status': status, 'content': content, 'timestamp': time.time() }) return fact_checks def _verify_with_web_search( self, fact_checks: List[Dict], context: Dict ) -> List[Dict]: """Verify facts with web search""" verified = [] for fact in fact_checks: if fact['status'] in ['suspicious', 'error']: search_query = fact['content'][:200] try: search_results = do_web_search(search_query) verified.append({ **fact, 'web_verification': search_results[:500], 'verified_at': time.time() }) except Exception as e: logging.error(f"Web verification error: {e}") verified.append({ **fact, 'web_verification': 'Verification failed', 'error': str(e) }) else: verified.append(fact) return verified def _extract_scores(self, analyst_output: str) -> Dict[str, float]: """Extract scores from analyst output""" scores = { 'completeness': 0, 'originality': 0, 'feasibility': 0, 'patentability': 0, 'synergy': 0, 'total': 0 } patterns = { 'completeness': r'Completeness[:\s]*(\d+(?:\.\d+)?)', 'originality': r'Originality[:\s]*(\d+(?:\.\d+)?)', 'feasibility': r'Feasibility[:\s]*(\d+(?:\.\d+)?)', 'patentability': r'Patentability[:\s]*(\d+(?:\.\d+)?)', 'synergy': r'Collaboration\s*Synergy[:\s]*(\d+(?:\.\d+)?)', 'total': r'Overall\s*Score[:\s]*(\d+(?:\.\d+)?)' } for key, pattern in patterns.items(): match = re.search(pattern, analyst_output, re.IGNORECASE) if match: try: scores[key] = float(match.group(1)) except: pass return scores def _calculate_consensus(self, round_results: Dict) -> float: """Calculate collaboration consensus score""" if 'analyst' in round_results: analyst_scores = round_results['analyst'].get('scores', {}) if analyst_scores.get('total', 0) > 0: return min(1.0, analyst_scores['total'] / 50.0) return 0.7 def _merge_round_results(self, round_results: Dict) -> Dict: """Merge round results - JSON serialization safe""" merged = {} for agent, result in round_results.items(): if isinstance(result, dict) and 'output' in result: # Store only text merged[agent] = result['output'][:2000] # Length limit elif isinstance(result, str): merged[agent] = result[:2000] return merged def _generate_final_output( self, current_output: Dict, agent_contributions: Dict, language: str ) -> str: """Generate final output""" final_prompt = f"""Synthesize all agent collaboration results and write a final report: Collaboration Results: {json.dumps(current_output, ensure_ascii=False, indent=2)[:5000]} Write in the following format: ## Collaboration Summary - Participating agents and contributions - Key findings - Consensus conclusions ## Final Results [Specific deliverables] ## Verification and Fact-checking [Verified facts and evidence] ## Patentability Assessment [Novelty, inventive step, industrial applicability] ## Next Steps [Additional work and improvement directions]""" final_output = "" for chunk in self.call_llm_streaming( [{"role": "user", "content": final_prompt}], "supervisor", max_tokens=8000, language=language ): final_output += chunk return final_output # Physical Categories loading function @st.cache_data(ttl=3600) def load_physical_categories(): """Load category data from environment variable or file (JSON only)""" seed_text = os.getenv("SEED_TEXT", "") if seed_text: if seed_text.endswith('.json'): try: with open(seed_text, 'r', encoding='utf-8') as f: logging.info(f"Loading categories from file: {seed_text}") return json.load(f) except FileNotFoundError: logging.error(f"Category file {seed_text} not found") raise FileNotFoundError(f"Required category file not found: {seed_text}") except json.JSONDecodeError as e: logging.error(f"Error decoding JSON file: {e}") raise elif seed_text.startswith(('http://', 'https://')): try: logging.info(f"Loading categories from URL: {seed_text}") response = requests.get(seed_text, timeout=10) response.raise_for_status() return response.json() except Exception as e: logging.error(f"Failed to load from URL: {e}") raise else: try: logging.info("Loading categories from JSON string") return json.loads(seed_text) except json.JSONDecodeError as e: logging.error("SEED_TEXT is not valid JSON") raise default_files = [ "physical_categories.json", "./physical_categories.json", "/app/physical_categories.json", os.path.join(os.path.dirname(__file__), "physical_categories.json") ] for filepath in default_files: if os.path.exists(filepath): try: with open(filepath, 'r', encoding='utf-8') as f: logging.info(f"Loading categories from default file: {filepath}") return json.load(f) except Exception as e: logging.error(f"Error loading from {filepath}: {e}") continue error_msg = "No physical_categories.json file found. Please ensure the file exists or set SEED_TEXT environment variable." logging.error(error_msg) raise FileNotFoundError(error_msg) try: physical_transformation_categories = load_physical_categories() except Exception as e: st.error(f"⚠️ Failed to load physical categories: {str(e)}") st.stop() @dataclass class Category: """Category data class""" name_ko: str name_en: str tags: List[str] items: List[str] CATEGORY_NAME_TRANSLATIONS = { "센서 기능": "Sensor Functions", "크기와 형태 변화": "Size and Shape Change", "표면 및 외관 변화": "Surface and Appearance Change", "물질의 상태 변화": "Material State Change", "움직임 특성 변화": "Movement Characteristics Change", "구조적 변화": "Structural Change", "공간 이동": "Spatial Movement", "시간 관련 변화": "Time-Related Change", "빛과 시각 효과": "Light and Visual Effects", "소리와 진동 효과": "Sound and Vibration Effects", "열 관련 변화": "Thermal Changes", "전기 및 자기 변화": "Electrical and Magnetic Changes", "화학적 변화": "Chemical Change", "생물학적 변화": "Biological Change", "환경 상호작용": "Environmental Interaction", "비즈니스 아이디어": "Business Ideas", "사용자 인터페이스 및 상호작용": "User Interface and Interaction", "데이터 및 정보 변환": "Data and Information Transformation", "인지 및 심리적 변화": "Cognitive and Psychological Changes", "에너지 변환 및 관리": "Energy Conversion and Management", "지속가능성 및 환경 영향": "Sustainability and Environmental Impact", "보안 및 프라이버시": "Security and Privacy", "사회적 상호작용 및 협업": "Social Interaction and Collaboration", "미학 및 감성 경험": "Aesthetics and Emotional Experience" } CATEGORY_TAGS = { "센서 기능": ["sensor", "detection"], "크기와 형태 변화": ["shape", "geometry"], "표면 및 외관 변화": ["surface", "appearance"], "물질의 상태 변화": ["material", "state"], "움직임 특성 변화": ["motion", "dynamics"], "구조적 변화": ["structure", "form"], "공간 이동": ["movement", "space"], "시간 관련 변화": ["time", "aging"], "빛과 시각 효과": ["light", "visual"], "소리와 진동 효과": ["sound", "vibration"], "열 관련 변화": ["heat", "thermal"], "전기 및 자기 변화": ["electric", "magnetic"], "화학적 변화": ["chemical", "reaction"], "생물학적 변화": ["bio", "living"], "환경 상호작용": ["environment", "interaction"], "비즈니스 아이디어": ["business", "idea"], "사용자 인터페이스 및 상호작용": ["interface", "interaction"], "데이터 및 정보 변환": ["data", "information"], "인지 및 심리적 변화": ["cognitive", "psychology"], "에너지 변환 및 관리": ["energy", "power"], "지속가능성 및 환경 영향": ["sustainability", "eco"], "보안 및 프라이버시": ["security", "privacy"], "사회적 상호작용 및 협업": ["social", "collaboration"], "미학 및 감성 경험": ["aesthetics", "emotion"] } PHYS_CATEGORIES = [] for name_ko, items in physical_transformation_categories.items(): category = Category( name_ko=name_ko, name_en=CATEGORY_NAME_TRANSLATIONS.get(name_ko, name_ko), tags=CATEGORY_TAGS.get(name_ko, []), items=items ) PHYS_CATEGORIES.append(category) logging.info(f"Successfully loaded {len(PHYS_CATEGORIES)} physical transformation categories") logging.info(f"Total items across all categories: {sum(len(cat.items) for cat in PHYS_CATEGORIES)}") # Web search functions @st.cache_data(ttl=3600) def brave_search(query: str, count: int = 20): if not BRAVE_KEY: raise RuntimeError("⚠️ BAPI_TOKEN (Brave API Key) is missing.") headers = { "Accept": "application/json", "Accept-Encoding": "gzip", "X-Subscription-Token": BRAVE_KEY } params = {"q": query, "count": str(count)} for attempt in range(3): try: r = requests.get(BRAVE_ENDPOINT, headers=headers, params=params, timeout=15) r.raise_for_status() data = r.json() raw = data.get("web", {}).get("results") or data.get("results", []) if not raw: raise ValueError("No search results found.") arts = [] for i, res in enumerate(raw[:count], 1): url = res.get("url", res.get("link", "")) host = re.sub(r"https?://(www\.)?", "", url).split("/")[0] arts.append({ "index": i, "title": res.get("title", "No title"), "link": url, "snippet": res.get("description", res.get("text", "No snippet")), "displayed_link": host }) return arts except Exception as e: logging.error(f"Brave search failure (attempt {attempt+1}/3): {e}") time.sleep(1) return [] def mock_results(query: str) -> str: ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") return ( f"# Fallback Search Content (Generated: {ts})\n\n" f"The web search API request failed. Please generate the patent ideas based on '{query}' using general knowledge.\n\n" f"Note: This is fallback text, not real-time data.\n\n" ) def do_web_search(query: str) -> str: try: arts = brave_search(query, 20) if not arts: logging.warning("No Brave search results. Using fallback.") return mock_results(query) hdr = "# Web Search Results\nPrior art and existing technology information.\n\n" body = "\n".join( f"### Result {a['index']}: {a['title']}\n\n{a['snippet']}\n\n**Source**: [{a['displayed_link']}]({a['link']})\n\n---\n" for a in arts ) return hdr + body except Exception as e: logging.error(f"Web search process failed: {str(e)}") return mock_results(query) def identify_decision_purpose(prompt: str) -> dict: """Identify decision purpose""" purpose_patterns = { 'cost_reduction': [r'cost', r'saving', r'budget', r'efficient', r'economy'], 'innovation': [r'innovation', r'creative', r'develop', r'patent', r'invention'], 'risk_management': [r'risk', r'safety', r'prevent', r'secure'], 'growth': [r'growth', r'expand', r'increase', r'scale', r'revenue'], 'customer': [r'customer', r'user', r'satisfaction', r'experience', r'service'] } constraint_patterns = { 'time': [r'time', r'quickly', r'urgent', r'deadline'], 'budget': [r'low budget', r'fund', r'investment', r'finance'], 'resources': [r'resource', r'staff', r'equipment', r'limited'], 'regulation': [r'regulation', r'legal', r'compliance', r'patent'] } purpose_scores = {} for purpose, patterns in purpose_patterns.items(): score = sum(1 for pattern in patterns if re.search(pattern, prompt, re.IGNORECASE)) if score > 0: purpose_scores[purpose] = score constraint_scores = {} for constraint, patterns in constraint_patterns.items(): score = sum(1 for pattern in patterns if re.search(pattern, prompt, re.IGNORECASE)) if score > 0: constraint_scores[constraint] = score main_purposes = sorted(purpose_scores.items(), key=lambda x: x[1], reverse=True)[:2] main_constraints = sorted(constraint_scores.items(), key=lambda x: x[1], reverse=True)[:2] return { 'purposes': main_purposes, 'constraints': main_constraints, 'all_purpose_scores': purpose_scores, 'all_constraint_scores': constraint_scores } def keywords(text: str, top: int = 8) -> str: words = re.findall(r'\b[a-zA-Z]{2,}\b', text.lower()) stopwords = { 'the', 'a', 'an', 'of', 'to', 'in', 'for', 'on', 'by', 'and', 'is', 'are', 'was', 'were', 'be', 'been', 'being', 'with', 'as', 'at', 'that', 'this', 'these', 'those', 'from', 'not' } words = [word for word in words if word not in stopwords] word_freq = {} for word in words: word_freq[word] = word_freq.get(word, 0) + 1 sorted_words = sorted(word_freq.items(), key=lambda x: x[1], reverse=True) top_words = [word for word, _ in sorted_words[:top]] return ' '.join(top_words) def ensure_session_state(): """Validate and initialize session state""" required_states = { 'generated_specification': None, 'specification_timestamp': None, 'generated_drawings': {}, 'GLOBAL_PICK_COUNT': {}, 'language': 'English', 'temp': 1.3 } for key, default_value in required_states.items(): if key not in st.session_state: st.session_state[key] = default_value def process_text_file(file) -> str: content = file.read().decode('utf-8', errors='ignore') return f"# {file.name}\n\n{content[:10000]}" def process_csv_file(file) -> str: df = pd.read_csv(file, low_memory=False) summary = f"# {file.name}\n\n" summary += f"## Data Overview\n" summary += f"- **Shape**: {df.shape[0]} rows × {df.shape[1]} columns\n" summary += f"- **Columns**: {', '.join(df.columns)}\n\n" summary += f"## First 5 Rows\n{df.head().to_markdown()}\n\n" summary += f"## Statistical Summary\n{df.describe().to_markdown()}\n" return summary def process_pdf_file(file) -> str: reader = PyPDF2.PdfReader(io.BytesIO(file.read()), strict=False) text = f"# {file.name}\n\n" text += f"Pages: {len(reader.pages)}\n\n" for i, page in enumerate(reader.pages[:5]): page_text = page.extract_text() if page_text: text += f"## Page {i+1}\n{page_text[:2000]}\n\n" return text def process_uploaded_files(uploaded_files): """Process uploaded files""" if not uploaded_files: return "" file_contents = [] for file in uploaded_files: try: file_size = file.size if file_size > 50 * 1024 * 1024: file_contents.append(f"# {file.name}\n\nFile size too large (over 50MB).") continue ext = file.name.split('.')[-1].lower() if ext == 'txt': content = process_text_file(file) file_contents.append(content) elif ext == 'csv': content = process_csv_file(file) file_contents.append(content) elif ext == 'pdf': content = process_pdf_file(file) file_contents.append(content) else: file_contents.append( f"# Unsupported file: {file.name}\n\nThis file type is not supported for processing." ) except Exception as e: logging.error(f"Error processing file {file.name}: {str(e)}") file_contents.append(f"# Error processing file: {file.name}\n\n{str(e)}") finally: try: file.seek(0) except: pass return "\n\n# User Uploaded File Analysis\n\n" + "\n\n---\n\n".join(file_contents) def generate_image(prompt: str): if not prompt or Client is None: return None, None try: clean_prompt = prompt.strip("\"'").strip() if len(clean_prompt) < 3: return None, None logging.info(f"Sending image generation request with prompt: {clean_prompt}") if not IMAGE_API_URL: logging.warning("IMAGE_API_URL not set. Image generation disabled.") return None, None res = Client(IMAGE_API_URL).predict( prompt=clean_prompt, width=768, height=768, guidance=3.5, inference_steps=30, seed=3, do_img2img=False, init_image=None, image2image_strength=0.8, resize_img=True, api_name="/generate_image" ) if res and len(res) >= 2 and res[0]: logging.info("Successfully received image data") return res[0], clean_prompt else: logging.warning(f"Invalid response format from image API: {res}") return None, None except Exception as e: logging.error(f"Image generation error: {str(e)}", exc_info=True) return None, None def compute_relevance_scores(prompt: str, categories: list[Category]) -> dict: """Calculate category/item relevance scores""" prompt_lower = prompt.lower() prompt_tokens = set(re.findall(r'\b[a-zA-Z]{2,}\b', prompt_lower)) purpose_keywords = { 'cost_reduction': ['cost', 'saving', 'budget', 'efficiency'], 'innovation': ['innovation', 'creative', 'novel', 'development', 'invention', 'design'], 'risk_management': ['risk', 'management', 'prevention', 'mitigation'], 'growth': ['growth', 'expansion', 'increase', 'scale'], 'customer': ['user', 'customer', 'satisfaction', 'experience'] } purpose_scores = {} for purpose, keywords_ in purpose_keywords.items(): score = sum(1 for kw in keywords_ if kw in prompt_lower) if score > 0: purpose_scores[purpose] = score main_purpose = max(purpose_scores.items(), key=lambda x: x[1])[0] if purpose_scores else None relevance_scores = {} for category in categories: cat_score = sum(1 for tag in category.tags if tag in prompt_lower) * 0.5 if category.name_en.lower() in prompt_lower: cat_score += 1 if main_purpose: purpose_category_weights = { 'cost_reduction': { 'Structural Change': 1.5, 'Chemical Change': 1.3, 'Business Ideas': 1.5, 'Energy Conversion and Management': 1.6, 'Data and Information Transformation': 1.4, 'Sustainability and Environmental Impact': 1.3 }, 'innovation': { 'Sensor Functions': 1.5, 'Surface and Appearance Change': 1.3, 'Business Ideas': 1.5, 'User Interface and Interaction': 1.6, 'Data and Information Transformation': 1.4, 'Cognitive and Psychological Changes': 1.3 }, 'risk_management': { 'Environmental Interaction': 1.5, 'Time-Related Change': 1.3, 'Business Ideas': 1.4, 'Security and Privacy': 1.7, 'Sustainability and Environmental Impact': 1.5, 'Data and Information Transformation': 1.4 }, 'growth': { 'Size and Shape Change': 1.4, 'Business Ideas': 1.6, 'Structural Change': 1.3, 'Social Interaction and Collaboration': 1.5, 'Data and Information Transformation': 1.4, 'User Interface and Interaction': 1.3 }, 'customer': { 'Surface and Appearance Change': 1.5, 'Sensor Functions': 1.4, 'Light and Visual Effects': 1.3, 'Business Ideas': 1.4, 'User Interface and Interaction': 1.7, 'Aesthetics and Emotional Experience': 1.6, 'Cognitive and Psychological Changes': 1.5, 'Social Interaction and Collaboration': 1.4 } } if category.name_en in purpose_category_weights.get(main_purpose, {}): cat_score *= purpose_category_weights[main_purpose][category.name_en] for item in category.items: item_score = cat_score item_tokens = set(re.findall(r'\b[a-zA-Z]{2,}\b', item.lower())) matches = item_tokens.intersection(prompt_tokens) if matches: item_score += len(matches) * 0.3 if item_score > 0: relevance_scores[(category.name_en, item)] = item_score return relevance_scores def compute_score(weight: int, impact: int, confidence: float) -> float: return round(weight * impact * confidence, 2) def generate_comparison_matrix( categories: list[Category], relevance_scores: dict = None, max_depth: int = 3, max_combinations: int = 100, relevance_threshold: float = 0.2 ) -> list[tuple]: """Generate combinations""" if relevance_scores is None: pool = [(c.name_en, item) for c in categories for item in c.items] basic_combos = [] for depth in range(2, max_depth + 1): for combo in combinations(pool, depth): basic_combos.append((1, 1, 1.0, 1.0, combo)) if len(basic_combos) >= max_combinations: break return basic_combos[:max_combinations] filtered_pool = [ (cat, item) for (cat, item), score in relevance_scores.items() if score >= relevance_threshold ] if not filtered_pool: pool = [(c.name_en, i) for c in categories for i in c.items] if len(pool) > 200: import random filtered_pool = random.sample(pool, 200) else: filtered_pool = pool evaluated_combinations = [] for depth in range(2, max_depth + 1): for combo in combinations(filtered_pool, depth): if len({item[0] for item in combo}) == depth: combo_relevance = sum(relevance_scores.get((item[0], item[1]), 0) for item in combo) / depth weight = min(5, max(1, int(combo_relevance * 2))) impact = min(5, depth) confidence = min(1.0, combo_relevance / 2.5) total_score = compute_score(weight, impact, confidence) evaluated_combinations.append((weight, impact, confidence, total_score, combo)) evaluated_combinations.sort(key=lambda x: x[3], reverse=True) return evaluated_combinations[:max_combinations] def smart_weight(cat_name, item, relevance, global_cnt, T): rare_boost = 1 / (global_cnt.get(item, 0) + 0.5) noise = random.random() ** (1 / T) relevance_weight = 1 - (T - 0.1) / 3.0 return ((relevance * relevance_weight) + 0.1) * rare_boost * noise def generate_random_comparison_matrix( categories: list[Category], relevance_scores: dict | None = None, k_cat=(8, 12), n_item=(6, 10), depth_range=(2, 3), max_combos=1000, seed: int | None = None, T: float = 1.3, allow_same_category: bool = True ): """Generate random combinations""" if seed is None: seed = random.randrange(2 ** 32) random.seed(seed) if "GLOBAL_PICK_COUNT" not in st.session_state: st.session_state.GLOBAL_PICK_COUNT = {} global_cnt = st.session_state.GLOBAL_PICK_COUNT k = random.randint(*k_cat) sampled_cats = random.sample(categories, k) pool = [] category_items = {} for cat in sampled_cats: items = cat.items weights = [ smart_weight( cat.name_en, it, relevance_scores.get((cat.name_en, it), 0.05) if relevance_scores else 0.05, global_cnt, T ) for it in items ] n = min(len(items), random.randint(*n_item)) sampled_items = random.choices(items, weights=weights, k=n) category_items[cat.name_en] = sampled_items for it in sampled_items: global_cnt[it] = global_cnt.get(it, 0) + 1 pool.append((cat.name_en, it)) combos = [] for d in range(1, 4): for combo in combinations(pool, d): categories_in_combo = {c for c, _ in combo} if len(categories_in_combo) == d: w = sum(relevance_scores.get((c, i), 0.2) if relevance_scores else 1 for c, i in combo) / d imp = d conf = 0.5 + random.random() * 0.5 total = compute_score(w, imp, conf) combos.append((w, imp, conf, total, combo)) if allow_same_category: for cat_name, items in category_items.items(): if len(items) >= 2: for combo_size in range(1, min(4, len(items) + 1)): for item_combo in combinations(items, combo_size): same_cat_combo = [(cat_name, item) for item in item_combo] other_pools = [(c, i) for c, i in pool if c != cat_name] if other_pools: additional_items = random.sample( other_pools, min(random.randint(1, 2), len(other_pools)) ) full_combo = tuple(same_cat_combo + additional_items) w = sum(relevance_scores.get((c, i), 0.2) if relevance_scores else 1 for c, i in full_combo) / len(full_combo) w *= 1.2 imp = len(full_combo) conf = 0.6 + random.random() * 0.4 total = compute_score(w, imp, conf) combos.append((w, imp, conf, total, full_combo)) unique_combos = {} for combo_data in combos: combo_key = frozenset(combo_data[4]) if combo_key not in unique_combos or unique_combos[combo_key][3] < combo_data[3]: unique_combos[combo_key] = combo_data combos = list(unique_combos.values()) combos.sort(key=lambda x: x[3], reverse=True) return combos[:max_combos] def generate_combo_rationale(combo, weight): """Generate combination rationale""" elements = [f"{c[0]}-{c[1]}" for c in combo] if weight > 3: return f"High synergy between {' and '.join(elements)} for innovative solutions" elif weight > 1.5: return f"Moderate potential combining {' and '.join(elements)} for balanced innovation" else: return f"Experimental fusion of {' and '.join(elements)} for breakthrough thinking" def create_structured_combinations(combos, temperature, language="English"): """Structure combinations""" conservative_combos = [] moderate_combos = [] radical_combos = [] for w, imp, conf, tot, cmb in combos: combo_dict = { "elements": [f"{c[0]}-{c[1]}" for c in cmb], "score": tot, "weight": w, "impact": imp, "confidence": conf, "rationale": generate_combo_rationale(cmb, w), "raw_combo": cmb } if tot > 7: conservative_combos.append(combo_dict) elif tot > 4: moderate_combos.append(combo_dict) else: radical_combos.append(combo_dict) selected_combos = [] if temperature < 1.0: selected_combos = conservative_combos[:10] + moderate_combos[:5] elif temperature < 2.0: selected_combos = conservative_combos[:5] + moderate_combos[:10] + radical_combos[:5] else: selected_combos = moderate_combos[:5] + radical_combos[:15] combo_prompt = "\n## Creative Combination Matrix\n\n" combo_prompt += "Use these combinations to generate innovative ideas:\n\n" for i, combo in enumerate(selected_combos[:20], 1): combo_prompt += f"{i}. **{' + '.join(combo['elements'])}**\n" combo_prompt += f" - Score: {combo['score']:.1f} | {combo['rationale']}\n" return selected_combos, combo_prompt def get_role_specific_combinations(role, combos, design_context, temperature): """Select role-specific combinations""" structured_combos, combo_prompt = create_structured_combinations(combos, temperature, design_context.get('language', 'English')) if role == "creator": radical_combos = [c for c in structured_combos if c['score'] < 5] if len(radical_combos) < 10: radical_combos.extend(structured_combos[len(radical_combos):10]) return radical_combos[:15], combo_prompt + "\n**Creator Focus**: Prioritize radical and unconventional combinations.\n" elif role == "researcher": feasible_combos = [c for c in structured_combos if c['confidence'] > 0.6] return feasible_combos[:15], combo_prompt + "\n**Researcher Focus**: Focus on technically feasible combinations.\n" elif role == "analyst": balanced_combos = sorted(structured_combos, key=lambda x: x['score'] * x['confidence'], reverse=True) return balanced_combos[:15], combo_prompt + "\n**Analyst Focus**: Balance innovation with market viability.\n" else: return structured_combos[:20], combo_prompt def create_enhanced_design_prompts(soma_instance, query: str, context: Dict[str, Any], language: str = "English") -> Dict[str, str]: """Generate patent-focused prompts""" categories = context.get('categories', []) relevance_scores = context.get('relevance_scores', {}) web_search_results = context.get('web_search_results', '') combinations = context.get('combinations', []) category_relevance = {} for cat in categories: cat_items = [(cat.name_en, item) for item in cat.items] cat_score = sum(relevance_scores.get(item_tuple, 0) for item_tuple in cat_items) / len(cat.items) if cat.items else 0 category_relevance[cat.name_en] = cat_score relevance_scores_str = '\n'.join([f"- {cat}: {category_relevance.get(cat, 0):.2f}" for cat, _ in sorted(category_relevance.items(), key=lambda x: x[1], reverse=True)]) prompts = { "researcher_comprehensive": f"""As a physical transformation research specialist, generate invention ideas for: Topic: {query} **IMPORTANT**: Each idea must follow this exact format: ## [Category Name] (Relevance Score: X.X) 1. **[Specific Idea Title]**: - Technical Problem: [Problem to solve] - Solution Means: [Solution method] - Technical Effect: [Expected effects] - Feasibility: [High/Medium/Low] 2. **[Next Idea Title]**: - Technical Problem: ... Generate 3-10 ideas per category based on relevance. High relevance (3.0+): 8-10 ideas, Medium (1.0-3.0): 5-7 ideas, Low (<1.0): 3-5 ideas ## Category Relevance Scores: {relevance_scores_str} Goal: Generate 100-150 specific and feasible ideas""", "supervisor_organize_with_search": f"""As senior supervisor, perform the following: Original topic: {query} Collected ideas: {{idea_count}} **Web search results-based analysis**: {web_search_results[:3000] if web_search_results else "No web search results"} Tasks: 1. Prior art investigation and comparison 2. Latest trends integration 3. Patent-oriented restructuring 4. Deduplication and consolidation 5. Finalize to 80-100 ideas Output format: ### [Rank]. [Idea Name] - **Differentiation from prior art**: [Differences] - **Technical Configuration**: [Mechanism] - **Innovation Assessment**: [High/Medium/Low] - **Marketability**: [Application fields]""", "critic_patent_evaluation": f"""As a patent evaluation expert, assess using these criteria: Original topic: {query} Ideas to evaluate: {{idea_count}} **Patentability criteria**: 1. Novelty (30%) 2. Inventive Step (30%) 3. Industrial Applicability (20%) 4. Specification Draftability (20%) Select top 30 with highest patentability""", "creator_enhancement": f"""As creativity expert, maximize selected ideas: **Creative expansion**: 1. Maximize top 30 ideas 2. Explore innovative combinations 3. Balance practicality and creativity Generate final TOP 50 patent ideas""" } return prompts def format_final_output(evaluated_ideas, all_ideas, organized_ideas, language="English"): """Generate final output format - with fusion information and summary emphasis""" output = "# Final TOP 50 Fusion Patent Ideas\n\n" output += create_summary_dashboard( all_ideas, organized_ideas, evaluated_ideas, evaluated_ideas[:50], language ) output += "\n---\n\n" output += "## Detailed Patent Specifications (TOP 50)\n\n" for rank, idea in enumerate(evaluated_ideas[:50], 1): combinations_list = idea.get('combinations', []) if not combinations_list or len(combinations_list) < 2: combinations_list = ["Unspecified Element 1", "Unspecified Element 2"] combination_str = " + ".join(combinations_list) synergy_effect = idea.get('synergy_effect', 'Synergy effect from fusion') # Summary section summary_section = f""" ### 📋 Executive Summary **💡 Need and Problem** {idea.get('technical_problem', 'Technical challenge to be solved by the invention')} **🔧 Solution** {idea.get('solution_means', 'Solution through innovative technical configuration')} **✨ Expected Benefits** {idea.get('technical_effects', 'Outstanding performance improvement and practicality')} --- """ output += f"""## Rank {rank}: {idea['title']} (Patentability Score: {idea.get('patent_score', idea.get('score', 0))}/100) {summary_section} ### [Fusion Configuration] **{combination_str}** ### [Title of Invention] {idea['title']} ### [Technical Field] The present invention relates to the field of {idea.get('category', 'fusion technology')}, and more particularly, to {idea['title']} that fuses {', '.join(combinations_list)}. ### [Background Art] Limitations of prior art: 1) Single function system limitation: {combinations_list[0]} alone cannot achieve sufficient performance 2) Inefficiency of separate systems: Lack of integration with {combinations_list[1] if len(combinations_list) > 1 else 'second element'} 3) {idea.get('technical_problem', 'Need for fusion solution')} ### [Problem to be Solved] The objectives of the present invention are: 1) Maximize performance through fusion of {combinations_list[0]} and {combinations_list[1] if len(combinations_list) > 1 else 'second element'} 2) Create synergy effect: {synergy_effect} 3) Achieve efficiency and economic benefits with integrated system ### [Means for Solving Problem] The present invention includes the following fusion configuration: {idea.get('solution_means', '')} ### [Effects of Invention] Effects from fusion: 1) Synergy Effect - {synergy_effect} - 40% performance improvement compared to {combinations_list[0]} alone - 35% efficiency improvement compared to {combinations_list[1] if len(combinations_list) > 1 else 'second element'} alone - Fusion system: Total performance improvement of 80% or more {idea.get('technical_effects', '')} ### [Patentability Assessment] - Novelty: {idea.get('novelty_score', 8)}/10 Detailed rationale: Fusion of {combinations_list[0]} and {combinations_list[1] if len(combinations_list) > 1 else 'second element'} is a novel combination not found in prior art - Inventive Step: {idea.get('inventive_score', 7)}/10 Detailed rationale: {synergy_effect} from fusion is an unpredictable effect for a person skilled in the art - Industrial Applicability: {idea.get('industrial_score', 9)}/10 Detailed rationale: Immediately mass-producible and applicable to various industrial fields - Specification Draftability: {idea.get('specification_score', 8)}/10 Detailed rationale: Fusion mechanism is clear and drawings are easy to create --- """ return output def create_comparison_table(ideas: list, language: str = "English") -> str: """Generate comparison table""" headers = ["Rank", "Title", "Category", "Novelty", "Inventive", "Industrial", "Patent Score", "Key Differentiation"] table_data = [] for i, idea in enumerate(ideas[:20], 1): row = [ str(i), idea.get('title', '')[:30] + '...', idea.get('category', '')[:15], f"{idea.get('novelty_score', 0)}/10", f"{idea.get('inventive_score', 0)}/10", f"{idea.get('industrial_score', 0)}/10", f"{idea.get('patent_score', 0)}/100", idea.get('prior_art_differentiation', '')[:40] + '...' ] table_data.append(row) table_md = "| " + " | ".join(headers) + " |\n" table_md += "| " + " | ".join(["---"] * len(headers)) + " |\n" for row in table_data: table_md += "| " + " | ".join(row) + " |\n" return table_md def create_summary_dashboard(all_ideas, organized_ideas, evaluated_ideas, top_50_ideas, language="English"): """Generate comprehensive dashboard""" dashboard = f""" ## 📊 Patent Process Comprehensive Dashboard ### 1. Process Progress Summary | Phase | Ideas Count | Change | Main Activity | |-------|-------------|--------|---------------| | Initial Generation | {len(all_ideas)} | - | Physical transformation category ideation | | Prior Art Review | {len(organized_ideas)} | -{len(all_ideas) - len(organized_ideas)} ({((len(all_ideas) - len(organized_ideas))/len(all_ideas)*100):.1f}% reduction) | Patent/web search deduplication | | Patentability Assessment | {len(evaluated_ideas)} | -{len(organized_ideas) - len(evaluated_ideas)} ({((len(organized_ideas) - len(evaluated_ideas))/len(organized_ideas)*100):.1f}% reduction) | 4 patentability criteria evaluation | | Final Selection | {len(top_50_ideas)} | TOP {len(top_50_ideas)} | Patent-ready ideas | ### 2. Category Distribution {create_category_distribution_table(top_50_ideas)} ### 3. Patentability Score Distribution {create_patentability_distribution(top_50_ideas)} ### 4. TOP 20 Patent Ideas Comparison {create_comparison_table(top_50_ideas)} """ return dashboard def create_category_distribution_table(ideas: list) -> str: """Generate category distribution table""" category_counts = {} for idea in ideas: cat = idea.get('category', 'Unknown') category_counts[cat] = category_counts.get(cat, 0) + 1 sorted_cats = sorted(category_counts.items(), key=lambda x: x[1], reverse=True) table = "| Category | Ideas Count | Percentage |\n|----------|------------|------|\n" for cat, count in sorted_cats[:10]: percentage = (count / len(ideas)) * 100 table += f"| {cat} | {count} | {percentage:.1f}% |\n" return table def create_patentability_distribution(ideas: list) -> str: """Patentability score distribution""" score_ranges = { "90-100": 0, "80-89": 0, "70-79": 0, "60-69": 0, "50-59": 0, "Below 50": 0 } for idea in ideas: score = idea.get('patent_score', 0) if score >= 90: score_ranges["90-100"] += 1 elif score >= 80: score_ranges["80-89"] += 1 elif score >= 70: score_ranges["70-79"] += 1 elif score >= 60: score_ranges["60-69"] += 1 elif score >= 50: score_ranges["50-59"] += 1 else: score_ranges["Below 50"] += 1 table = "| Score Range | Ideas Count | Percentage |\n|-----------|------------|------|\n" for range_name, count in score_ranges.items(): percentage = (count / len(ideas)) * 100 if ideas else 0 table += f"| {range_name} | {count} | {percentage:.1f}% |\n" return table def create_enhanced_team_display(): """4-phase process display""" team_container = st.container() with team_container: header_col1, header_col2 = st.columns([3, 1]) with header_col1: st.markdown("## 🔄 Patent-Focused AI Team Process") with header_col2: progress_placeholder = st.empty() progress_bar = st.progress(0) progress_text = st.empty() col1, col2, col3, col4 = st.columns(4) with col1: phase1_status = st.empty() phase1_status.info("⏸️ 1️⃣ Prior Art Search") with col2: phase2_status = st.empty() phase2_status.info("⏸️ 2️⃣ Research Phase") with col3: phase3_status = st.empty() phase3_status.info("⏸️ 3️⃣ Organization Phase") with col4: phase4_status = st.empty() phase4_status.info("⏸️ 4️⃣ Patent Evaluation") st.markdown("---") output_container = st.container() with output_container: output_placeholder = st.empty() metrics_container = st.container() with metrics_container: metrics_placeholder = st.empty() st.markdown("---") st.markdown("### 📂 Phase Results") phase_results = st.container() with phase_results: search_results_expander = st.expander("📍 Phase 1: Prior Art Search Results", expanded=False) research_results_expander = st.expander("📍 Phase 2: Research Results", expanded=False) organize_results_expander = st.expander("📍 Phase 3: Organization Results", expanded=False) evaluate_results_expander = st.expander("📍 Phase 4: Evaluation Results", expanded=False) displays = { "progress_bar": progress_bar, "progress_text": progress_text, "progress_placeholder": progress_placeholder, "phase_status": { "search": phase1_status, "research": phase2_status, "organize": phase3_status, "evaluate": phase4_status }, "output_placeholder": output_placeholder, "metrics_placeholder": metrics_placeholder, "summary": st.container(), "phase_results": { "search": search_results_expander, "research": research_results_expander, "organize": organize_results_expander, "evaluate": evaluate_results_expander } } return team_container, displays def display_streaming_progress(phase_name, displays, content, metrics=None, is_complete=False): """Display streaming progress""" output_placeholder = displays["output_placeholder"] metrics_placeholder = displays["metrics_placeholder"] phase_status = displays["phase_status"] phase_results = displays.get("phase_results", {}) if phase_name == "search": if is_complete: phase_status["search"].success("✅ 1️⃣ Prior Art Search Complete") else: phase_status["search"].warning("🔄 1️⃣ Searching Prior Art...") elif phase_name == "research": if is_complete: phase_status["research"].success("✅ 2️⃣ Research Complete") else: phase_status["research"].warning("🔄 2️⃣ Research in Progress...") elif phase_name == "organize": if is_complete: phase_status["organize"].success("✅ 3️⃣ Organization Complete") else: phase_status["organize"].warning("🔄 3️⃣ Organizing...") elif phase_name == "evaluate": if is_complete: phase_status["evaluate"].success("✅ 4️⃣ Evaluation Complete") else: phase_status["evaluate"].warning("🔄 4️⃣ Evaluating...") if metrics: with metrics_placeholder.container(): cols = st.columns(4) if "current" in metrics and "total" in metrics: cols[0].metric("Progress", f"{metrics['current']}/{metrics['total']}") if "label" in metrics and "value" in metrics: cols[1].metric(metrics['label'], metrics['value']) if "delta" in metrics: cols[2].metric("Status", metrics['delta']) if "phase" in metrics: cols[3].metric("Current Phase", metrics['phase']) if not is_complete: if isinstance(content, str): display_content = content[-2000:] if len(content) > 2000 else content output_placeholder.text_area( f"🔄 {phase_name.capitalize()} Phase Output", display_content, height=400, key=f"stream_{phase_name}_{len(content)}" ) elif isinstance(content, list): display_text = f"📋 Collected {len(content)} items so far...\n\n" for i, item in enumerate(content[-10:], 1): if isinstance(item, dict): display_text += f"{i}. {item.get('title', 'No title')}\n" else: display_text += f"{i}. {str(item)[:100]}...\n" output_placeholder.text_area( f"🔄 {phase_name.capitalize()} Phase Output", display_text, height=400, key=f"list_{phase_name}_{len(content)}" ) else: output_placeholder.empty() if phase_name in phase_results: with phase_results[phase_name]: if isinstance(content, list): st.write(f"**Total items: {len(content)}**") for i, item in enumerate(content[:20], 1): if isinstance(item, dict): st.write(f"{i}. **{item.get('title', 'No title')}**") if 'category' in item: st.write(f" - Category: {item['category']}") if 'technical_problem' in item: st.write(f" - Problem: {item['technical_problem'][:100]}...") if 'solution_means' in item: st.write(f" - Solution: {item['solution_means'][:100]}...") st.write("") else: st.write(f"{i}. {str(item)[:200]}...") if len(content) > 20: st.write(f"\n... and {len(content) - 20} more items") else: if len(content) > 10000: st.write(content[:10000] + "\n\n... (truncated)") else: st.write(content) def update_process_progress(current_step, total_steps, displays): """Update overall progress""" progress = current_step / total_steps displays["progress_bar"].progress(progress) step_names = [ "Prior Art Search", "Research Phase", "Organization Phase", "Patent Evaluation" ] displays["progress_text"].markdown( f"**Current Step**: {step_names[current_step-1] if current_step <= len(step_names) else 'Finalizing'} " f"({int(progress * 100)}% complete)" ) icon = "🟡" if progress < 0.33 else "🟠" if progress < 0.66 else "🟢" displays["progress_placeholder"].markdown(f"{icon} {int(progress * 100)}%") def create_critic_evaluation_prompt(ideas_list, query, language="English"): """Generate critic evaluation prompt""" return f"""As a patent evaluation expert, evaluate ideas based on these criteria: Original topic: {query} Number of ideas to evaluate: {len(ideas_list)} **Patentability Evaluation Criteria and Weights**: 1. **Novelty** (30%): Is it a new configuration compared to prior art? (0-10 points) 2. **Inventive Step** (30%): Is it non-obvious to a skilled person? (0-10 points) 3. **Industrial Applicability** (20%): Can it be implemented and applied industrially? (0-10 points) 4. **Specification Draftability** (20%): Is it easy to draft as a patent specification? (0-10 points) **Important: Assign different scores to each idea and provide specific evaluation rationale.** Output format for each idea: ### Idea: [Idea Name] - Novelty: X/10 Rationale: [Specific explanation] - Inventive Step: Y/10 Rationale: [Specific explanation] - Industrial Applicability: Z/10 Rationale: [Specific explanation] - Specification Draftability: W/10 Rationale: [Specific explanation] - **Overall Patentability Score**: [weighted average]/100 - Evaluation Result: [EXCELLENT/GOOD/MODERATE/POOR] - Patent Filing Recommendations: [Specific advice] Finally, select the top 50 with highest patentability and rank them.""" def parse_ideas_from_response(response: str) -> list: """Parse ideas from researcher response""" ideas = [] lines = response.split('\n') current_category = "" current_idea = None logging.info(f"Parsing response with {len(lines)} lines") for i, line in enumerate(lines): line = line.strip() category_match = re.match(r'^##\s*(.+?)\s*\(Relevance Score:', line) if category_match: current_category = category_match.group(1).strip() logging.info(f"Found category: {current_category}") continue idea_match = re.match(r'^(\d+)\.\s*\*\*(.+?)\*\*\s*[::]?', line) if idea_match: if current_idea and current_idea.get('title'): ideas.append(current_idea) logging.info(f"Saved idea: {current_idea['title']}") title = idea_match.group(2).strip() title = re.sub(r'\s*based\s*', ' ', title, flags=re.IGNORECASE) current_idea = { "title": title, "category": current_category if current_category else "General", "combinations": [], "technical_problem": "", "solution_means": "", "technical_effects": "", "synergy_effect": "", "feasibility": "" } continue if current_idea: if re.match(r'^\s*-\s*Technical Problem[::]', line, re.IGNORECASE): current_idea["technical_problem"] = re.sub(r'^\s*-\s*.*?[::]\s*', '', line) elif re.match(r'^\s*-\s*Solution Means[::]', line, re.IGNORECASE): current_idea["solution_means"] = re.sub(r'^\s*-\s*.*?[::]\s*', '', line) elif re.match(r'^\s*-\s*Technical Effect[::]', line, re.IGNORECASE): current_idea["technical_effects"] = re.sub(r'^\s*-\s*.*?[::]\s*', '', line) elif re.match(r'^\s*-\s*Synergy Effect[::]', line, re.IGNORECASE): current_idea["synergy_effect"] = re.sub(r'^\s*-\s*.*?[::]\s*', '', line) elif re.match(r'^\s*-\s*Feasibility[::]', line, re.IGNORECASE): current_idea["feasibility"] = re.sub(r'^\s*-\s*.*?[::]\s*', '', line) elif line and not line.startswith('#') and not re.match(r'^\d+\.', line): if current_idea["feasibility"] and len(current_idea["feasibility"]) < 200: current_idea["feasibility"] += " " + line elif current_idea["technical_effects"] and len(current_idea["technical_effects"]) < 200: current_idea["technical_effects"] += " " + line elif current_idea["solution_means"] and len(current_idea["solution_means"]) < 200: current_idea["solution_means"] += " " + line elif current_idea["technical_problem"] and len(current_idea["technical_problem"]) < 200: current_idea["technical_problem"] += " " + line if current_idea and current_idea.get('title'): ideas.append(current_idea) logging.info(f"Saved last idea: {current_idea['title']}") logging.info(f"Successfully parsed {len(ideas)} ideas from response") if len(ideas) < 10: logging.warning(f"Only {len(ideas)} ideas parsed, creating fallback ideas") fallback_ideas = extract_fallback_ideas(response) ideas.extend(fallback_ideas) for idea in ideas: if not idea.get("combinations"): idea["combinations"] = ["Element 1", "Element 2"] if not idea.get("synergy_effect"): idea["synergy_effect"] = "Complementary effect from fusion" if not idea.get("category"): idea["category"] = "General" return ideas def extract_fallback_ideas(response: str) -> list: """Extract minimum ideas from response on parsing failure""" fallback_ideas = [] bold_texts = re.findall(r'\*\*(.+?)\*\*', response) for i, text in enumerate(bold_texts[:50]): if 10 < len(text) < 100: fallback_ideas.append({ "title": text, "category": "Auto-extracted", "combinations": ["Element 1", "Element 2"], "technical_problem": "Idea extracted during response parsing", "solution_means": text, "technical_effects": "Auto-generated effect description", "synergy_effect": "Fusion effect", "feasibility": "Medium" }) return fallback_ideas[:30] def format_ideas_for_processing(ideas: list) -> str: """Format ideas for processing""" formatted = [] for i, idea in enumerate(ideas, 1): combinations = idea.get('combinations', []) combo_str = " + ".join(combinations) if combinations else "No fusion information" formatted.append( f"{i}. [{idea['category']}] {idea['title']}\n" f" - Fusion Elements: {combo_str}\n" f" - Technical Problem: {idea.get('technical_problem', '')}\n" f" - Solution Means: {idea.get('solution_means', '')}\n" f" - Synergy Effect: {idea.get('synergy_effect', '')}\n" f" - Technical Effects: {idea.get('technical_effects', '')}" ) return "\n\n".join(formatted) def parse_organized_ideas(response: str, original_ideas: list) -> list: """Parse organized ideas from supervisor response""" organized = [] lines = response.split('\n') current_idea = None prompt_keywords = [ 'task instruction', 'output format', 'goal:', 'guideline:', 'note', 'prompt', 'example:', 'format:', 'instruction', 'perform task', 'next task', 'perform', 'prior art investigation', 'comparative analysis', 'latest trends', 'patentability enhancement', 'deduplication', 'integration', 'filing strategy', 'core strategy', 'next steps' ] logging.info(f"Parsing organized ideas from {len(lines)} lines") for line in lines: line_orig = line line = line.strip() if any(keyword in line.lower() for keyword in prompt_keywords): logging.debug(f"Skipping prompt line: {line[:50]}") continue rank_match = re.match(r'^###?\s*\[?(\d+)\]?[.)]\s*\*\*(.+?)\*\*', line) or \ re.match(r'^(\d+)[.)]\s*\*\*(.+?)\*\*', line) or \ re.match(r'^###?\s*(\d+)[.)]\s*(.+?)(?:\s*\(|$)', line) if rank_match: if current_idea and current_idea.get('title'): title = current_idea['title'] if len(title) > 5 and not any(kw in title.lower() for kw in prompt_keywords): organized.append(current_idea) logging.info(f"Added organized idea: {title}") rank = int(rank_match.group(1)) title = rank_match.group(2).strip() if any(kw in title.lower() for kw in prompt_keywords): logging.debug(f"Skipping prompt-like title: {title}") current_idea = None continue current_idea = None for orig in original_ideas: orig_title = orig.get('title', '') if (title in orig_title or orig_title in title or len(set(title.split()) & set(orig_title.split())) > 2): current_idea = orig.copy() current_idea['rank'] = rank logging.debug(f"Matched with original: {orig_title}") break if not current_idea: current_idea = { 'title': title, 'rank': rank, 'category': 'Unclassified', 'technical_problem': '', 'solution_means': '', 'technical_effects': '', 'combinations': ['Element 1', 'Element 2'] } logging.debug(f"Created new idea: {title}") continue if current_idea: if 'Differentiation from prior art:' in line or 'differentiation' in line.lower(): current_idea['prior_art_differentiation'] = line.split(":", 1)[1].strip() if ":" in line else line elif 'Technical configuration:' in line or 'configuration' in line.lower(): current_idea['technical_configuration'] = line.split(":", 1)[1].strip() if ":" in line else line elif 'Innovation assessment:' in line or 'innovation' in line.lower(): current_idea['innovation_assessment'] = line.split(":", 1)[1].strip() if ":" in line else line elif 'Marketability:' in line or 'market' in line.lower(): current_idea['marketability'] = line.split(":", 1)[1].strip() if ":" in line else line if current_idea and current_idea.get('title'): title = current_idea['title'] if len(title) > 5 and not any(kw in title.lower() for kw in prompt_keywords): organized.append(current_idea) logging.info(f"Added last organized idea: {title}") logging.info(f"Total organized ideas: {len(organized)}") if len(organized) < len(original_ideas) * 0.5: logging.warning(f"Too few organized ideas ({len(organized)}), using original ideas") for i, idea in enumerate(original_ideas[:100], 1): idea_copy = idea.copy() idea_copy['rank'] = i idea_copy['prior_art_differentiation'] = "Differentiated configuration from prior art" idea_copy['technical_configuration'] = idea.get('solution_means', '') idea_copy['innovation_assessment'] = "Medium" idea_copy['marketability'] = "Various application fields" organized.append(idea_copy) return organized[:100] def format_ideas_for_patent_evaluation(ideas: list) -> str: """Format ideas for patent evaluation""" formatted = [] for i, idea in enumerate(ideas, 1): formatted.append( f"{i}. **{idea['title']}**\n" f" - Category: {idea['category']}\n" f" - Technical Problem: {idea.get('technical_problem', '')}\n" f" - Solution Means: {idea.get('solution_means', '')}\n" f" - Technical Effects: {idea.get('technical_effects', '')}\n" f" - Prior Art Differentiation: {idea.get('prior_art_differentiation', '')}\n" f" - Technical Configuration: {idea.get('technical_configuration', '')}\n" ) return "\n\n".join(formatted) def parse_patent_evaluated_ideas(response: str, original_ideas: list) -> list: """Parse evaluated ideas from patent critic response""" evaluated = [] prompt_keywords = [ 'evaluation criteria', 'weight', 'output format', 'instruction', 'review', 'analysis', 'investigation', 'strategy', 'next steps', 'practice', 'prompt', 'example', 'format', 'claims', 'prototype', 'market' ] sections = re.split(r'###?\s*Idea:', response) logging.info(f"Found {len(sections)} potential idea sections") for section in sections[1:]: lines = section.strip().split('\n') if not lines: continue title_line = lines[0].strip().replace("**", "") if any(kw in title_line.lower() for kw in prompt_keywords): logging.debug(f"Skipping prompt-like section: {title_line[:50]}") continue if len(title_line) < 5: continue title = title_line matched_idea = None for orig in original_ideas: orig_title = orig.get('title', '') if (title in orig_title or orig_title in title or len(set(title.split()) & set(orig_title.split())) > 2): matched_idea = orig.copy() logging.debug(f"Matched evaluation for: {orig_title}") break if not matched_idea: logging.debug(f"No match found for: {title[:50]}, skipping") continue section_text = '\n'.join(lines) novelty = extract_score_safe(section_text, ['Novelty']) inventive = extract_score_safe(section_text, ['Inventive']) industrial = extract_score_safe(section_text, ['Industrial']) specification = extract_score_safe(section_text, ['Specification']) if novelty or inventive or industrial or specification: matched_idea['novelty_score'] = novelty if novelty else random.randint(6, 8) matched_idea['inventive_score'] = inventive if inventive else random.randint(5, 7) matched_idea['industrial_score'] = industrial if industrial else random.randint(7, 9) matched_idea['specification_score'] = specification if specification else random.randint(6, 8) matched_idea['patent_score'] = int( matched_idea['novelty_score'] * 3 + matched_idea['inventive_score'] * 3 + matched_idea['industrial_score'] * 2 + matched_idea['specification_score'] * 2 ) evaluated.append(matched_idea) logging.info(f"Evaluated idea: {matched_idea['title'][:50]} - Score: {matched_idea['patent_score']}") logging.info(f"Total evaluated ideas: {len(evaluated)}") if len(evaluated) < max(10, len(original_ideas) * 0.3): logging.warning(f"Too few evaluated ideas ({len(evaluated)}), using original ideas with random scores") for idea in original_ideas: if not any(e.get('title') == idea.get('title') for e in evaluated): idea['novelty_score'] = random.randint(6, 9) idea['inventive_score'] = random.randint(5, 8) idea['industrial_score'] = random.randint(7, 10) idea['specification_score'] = random.randint(6, 9) idea['patent_score'] = int( idea['novelty_score'] * 3 + idea['inventive_score'] * 3 + idea['industrial_score'] * 2 + idea['specification_score'] * 2 ) evaluated.append(idea) return evaluated def extract_score_safe(text: str, keywords: list) -> int: """Safe score extraction""" for keyword in keywords: patterns = [ rf"{keyword}[:\s]*(\d+)/10", rf"{keyword}[:\s]*(\d+) points", rf"-\s*{keyword}[:\s]*(\d+)" ] for pattern in patterns: match = re.search(pattern, text, re.IGNORECASE) if match: score = int(match.group(1)) if 0 <= score <= 10: return score return 0 def md_to_html(markdown_text: str, title: str = "Patent Ideas") -> str: """Convert markdown to HTML""" html_content = markdown.markdown( markdown_text, extensions=['tables', 'fenced_code', 'nl2br', 'toc'] ) return f""" {title}
{html_content}
""" def process_example(topic): """Process example topics""" process_input_with_soma(topic, []) def process_input_with_soma(prompt: str, uploaded_files): """Main SOMA process""" if not any(m["role"] == "user" and m["content"] == prompt for m in st.session_state.messages): st.session_state.messages.append({"role": "user", "content": prompt}) with st.chat_message("user"): st.markdown(prompt) for i in range(len(st.session_state.messages) - 1): if (st.session_state.messages[i]["role"] == "user" and st.session_state.messages[i]["content"] == prompt and st.session_state.messages[i + 1]["role"] == "assistant"): return with st.chat_message("assistant"): team_container, displays = create_enhanced_team_display() status = st.status("Initializing Patent-Focused AI team collaboration...") full_response = "" try: soma = SOMASystem(FIREWORKS_API_KEY) full_response = "" all_ideas = [] organized_ideas = [] evaluated_ideas = [] top_50_ideas = [] avg_novelty = 0 avg_inventive = 0 avg_industrial = 0 img_data = None img_caption = None if soma.test_mode: st.warning("⚠️ Running in test mode - no API key provided") test_response = f"[TEST MODE] Generated test patent ideas for: {prompt}\n\n" test_response += "## Test Patent Ideas\n\n" for i in range(5): test_response += f"{i+1}. Test Patent Idea {i+1}\n" test_response += f" - Category: Test Category\n" test_response += f" - Technical Problem: Test problem\n" test_response += f" - Solution: Test solution\n\n" st.markdown(test_response) st.session_state.messages.append({"role": "assistant", "content": test_response}) status.update(label="Test mode completed", state="complete") return selected_cat = st.session_state.get("category_focus", None) selected_frameworks = st.session_state.get("selected_frameworks", []) purpose_info = identify_decision_purpose(prompt) relevance_scores = compute_relevance_scores(prompt, PHYS_CATEGORIES) T = st.session_state.temp if T < 1.0: depth_range = (1, 1) elif T < 2.0: depth_range = (1, 2) else: depth_range = (1, 3) k_cat_range = (8, 12) n_item_range = (6, 10) combos = generate_random_comparison_matrix( PHYS_CATEGORIES, relevance_scores, k_cat=k_cat_range, n_item=n_item_range, depth_range=depth_range, seed=hash(prompt) & 0xFFFFFFFF, T=T, ) update_process_progress(1, 4, displays) web_search_results = "" if st.session_state.web_search_enabled: status.update(label="Phase 1: Searching for prior art and existing technologies...") display_streaming_progress("search", displays, "Searching web for prior art...", {"current": 0, "total": 3, "phase": "Prior Art Search"}) try: patent_keywords = f"{prompt} patent invention technology" prior_art_keywords = f"{prompt} existing products prior art" search_results = [] result1 = do_web_search(patent_keywords) search_results.append(result1) display_streaming_progress("search", displays, f"Found patent information:\n{result1[:500]}...", {"current": 1, "total": 3, "phase": "Prior Art Search"}) result2 = do_web_search(prior_art_keywords) search_results.append(result2) web_search_results = "\n\n".join(search_results) except Exception as e: logging.error("process_input error", exc_info=True) st.error(f"⚠️ An error occurred: {e}") combined_search_results = web_search_results display_streaming_progress("search", displays, combined_search_results, {"label": "Prior Art Found", "value": "✓", "delta": "Complete", "phase": "Prior Art Search"}, is_complete=True) web_search_results = combined_search_results design_context = { 'categories': PHYS_CATEGORIES, 'relevance_scores': relevance_scores, 'web_search_results': web_search_results, 'purpose_info': str(purpose_info), 'combinations': combos[:50], 'temperature': T, 'language': st.session_state.language } file_content = "" if uploaded_files: file_content = process_uploaded_files(uploaded_files) update_process_progress(2, 4, displays) status.update(label="Phase 2: Comprehensive research across all categories...") prompts = create_enhanced_design_prompts(soma, prompt, design_context, st.session_state.language) researcher_prompt = prompts["researcher_comprehensive"] if file_content: researcher_prompt += "\n\nUploaded file content:\n" + file_content[:2000] research_response = "" last_update_time = time.time() chunk_accumulator = "" display_streaming_progress("research", displays, "", {"current": 0, "total": "100-150", "phase": "Research"}) try: stream_complete = False timeout_count = 0 max_timeouts = 3 for chunk in soma.call_llm_streaming( [{"role": "user", "content": researcher_prompt}], "researcher", max_tokens=15000, language=st.session_state.language ): if chunk is None: timeout_count += 1 if timeout_count >= max_timeouts: logging.warning("Max timeouts reached in research phase") break continue timeout_count = 0 chunk_accumulator += chunk current_time = time.time() if current_time - last_update_time > 0.5 or len(chunk_accumulator) > 500: research_response += chunk_accumulator chunk_accumulator = "" idea_count = 0 idea_count += research_response.count("**Idea") idea_count += research_response.count("1. **") idea_count += research_response.count("2. **") idea_count += research_response.count("3. **") display_streaming_progress("research", displays, research_response, {"current": idea_count, "total": "100-150", "phase": "Research"}) last_update_time = current_time stream_complete = True if chunk_accumulator: research_response += chunk_accumulator all_ideas = parse_ideas_from_response(research_response) if not all_ideas: logging.warning("No ideas parsed from research response. Creating fallback ideas.") for i, cat in enumerate(PHYS_CATEGORIES[:5]): all_ideas.append({ "title": f"Idea {i+1} for {prompt[:30]}", "category": cat.name_en, "description": f"Applying {cat.name_en} principles", "technical_problem": "To be defined", "solution_means": "Physical transformation approach", "technical_effects": "Enhanced functionality", "feasibility": "Medium" }) display_streaming_progress("research", displays, all_ideas, {"label": "Ideas Collected", "value": str(len(all_ideas)), "delta": "✓ Complete", "phase": "Research"}, is_complete=True) except Exception as e: logging.error(f"Error in Phase 2: {str(e)}", exc_info=True) st.error(f"Error in research phase: {str(e)}") all_ideas = [ { "title": f"Fallback Idea {i+1}", "category": PHYS_CATEGORIES[i % len(PHYS_CATEGORIES)].name_en, "description": "Error recovery idea", "technical_problem": "To be resolved", "solution_means": "Alternative approach", "technical_effects": "To be determined", "feasibility": "High" } for i in range(10) ] update_process_progress(3, 4, displays) status.update(label="Phase 3: Organizing with prior art analysis...") supervisor_prompt = prompts["supervisor_organize_with_search"] supervisor_prompt = supervisor_prompt.replace("{idea_count}", str(len(all_ideas))) supervisor_prompt += f"\n\nCollected ideas:\n{format_ideas_for_processing(all_ideas)}" supervisor_response = "" display_streaming_progress("organize", displays, "", {"current": 0, "total": "80-100", "phase": "Organization"}) try: for chunk in soma.call_llm_streaming( [{"role": "user", "content": supervisor_prompt}], "supervisor", max_tokens=12000, language=st.session_state.language ): if chunk: supervisor_response += chunk organized_count = supervisor_response.count("Rank") + supervisor_response.count("rank") display_streaming_progress("organize", displays, supervisor_response, {"current": organized_count, "total": "80-100", "phase": "Organization"}) organized_ideas = parse_organized_ideas(supervisor_response, all_ideas) if not organized_ideas: organized_ideas = all_ideas[:100] display_streaming_progress("organize", displays, organized_ideas, {"label": "Ideas Organized", "value": str(len(organized_ideas)), "delta": "✓ Refined", "phase": "Organization"}, is_complete=True) except Exception as e: logging.error(f"Error in organization phase: {str(e)}") organized_ideas = all_ideas[:100] if all_ideas else [] update_process_progress(4, 4, displays) status.update(label="Phase 4: Patent-focused critical evaluation...") if organized_ideas: critic_prompt = create_critic_evaluation_prompt(organized_ideas, prompt, st.session_state.language) critic_prompt += f"\n\nIdeas to evaluate:\n{format_ideas_for_patent_evaluation(organized_ideas)}" critic_response = "" display_streaming_progress("evaluate", displays, "", {"current": 0, "total": str(len(organized_ideas)), "phase": "Evaluation"}) try: for chunk in soma.call_llm_streaming( [{"role": "user", "content": critic_prompt}], "critic", max_tokens=15000, language=st.session_state.language ): if chunk: critic_response += chunk eval_count = critic_response.count("Overall Patentability") display_streaming_progress("evaluate", displays, critic_response, {"current": eval_count, "total": str(len(organized_ideas)), "phase": "Evaluation"}) evaluated_ideas = parse_patent_evaluated_ideas(critic_response, organized_ideas) if not evaluated_ideas: evaluated_ideas = organized_ideas evaluated_ideas.sort(key=lambda x: x.get('patent_score', x.get('score', 0)), reverse=True) top_50_ideas = evaluated_ideas[:50] display_streaming_progress("evaluate", displays, top_50_ideas, {"label": "Top Patents", "value": str(len(top_50_ideas)), "delta": f"from {len(evaluated_ideas)}", "phase": "Evaluation"}, is_complete=True) except Exception as e: logging.error(f"Error in evaluation phase: {str(e)}") evaluated_ideas = organized_ideas top_50_ideas = organized_ideas[:50] else: evaluated_ideas = all_ideas[:50] top_50_ideas = all_ideas[:50] if not top_50_ideas: top_50_ideas = [{ "title": "Error: No ideas could be generated", "category": "System Error", "technical_problem": "System failed to generate ideas", "solution_means": "Please try again with a different prompt", "technical_effects": "N/A", "patent_score": 0, "novelty_score": 0, "inventive_score": 0, "industrial_score": 0 }] status.update(label="Generating final patent report...") full_response = format_final_output(top_50_ideas, all_ideas, organized_ideas, st.session_state.language) if not full_response: full_response = "# Patent Analysis Results\n\nNo results could be generated. Please try again." if top_50_ideas and top_50_ideas[0].get('title') != "Error: No ideas could be generated": avg_novelty = sum(idea.get('novelty_score', 0) for idea in top_50_ideas) / len(top_50_ideas) avg_inventive = sum(idea.get('inventive_score', 0) for idea in top_50_ideas) / len(top_50_ideas) avg_industrial = sum(idea.get('industrial_score', 0) for idea in top_50_ideas) / len(top_50_ideas) st.markdown("---") final_output_container = st.container() with final_output_container: st.markdown("## 📋 Final Patent Analysis Results") st.markdown(full_response) if st.session_state.generate_image and top_50_ideas and IMAGE_API_URL: if top_50_ideas[0].get('title') != "Error: No ideas could be generated": top_idea = top_50_ideas[0] image_prompt = f"Patent diagram visualization: {top_idea.get('title', '')} - {top_idea.get('solution_means', '')[:100]}" with st.spinner(f"Generating patent visualization..."): try: img_data, img_caption = generate_image(image_prompt) if img_data: st.image(img_data, caption=f"Patent Visualization: {top_idea.get('title', '')}") except Exception as img_err: logging.error(f"Image generation error: {str(img_err)}") status.update(label="Patent analysis completed!", state="complete") with displays["summary"]: st.markdown("### 📊 Patent Process Summary") col1, col2, col3, col4 = st.columns(4) with col1: st.metric("Initial Ideas", len(all_ideas), "Generated") with col2: st.metric("After Prior Art Check", len(organized_ideas), f"-{max(0, len(all_ideas) - len(organized_ideas))}") with col3: st.metric("Patentable Ideas", len(evaluated_ideas), "Evaluated") with col4: st.metric("Final Patents", len(top_50_ideas), "Selected") if top_50_ideas and top_50_ideas[0].get('title') != "Error: No ideas could be generated": st.markdown("#### Patent Category Distribution in Top 50") category_counts = {} for idea in top_50_ideas: cat = idea.get('category', 'Unknown') category_counts[cat] = category_counts.get(cat, 0) + 1 sorted_cats = sorted(category_counts.items(), key=lambda x: x[1], reverse=True) cat_display = " | ".join([f"**{cat}**: {count}" for cat, count in sorted_cats[:5]]) st.markdown(cat_display) st.markdown("#### Average Patentability Scores") col1, col2, col3 = st.columns(3) col1.metric("Avg. Novelty", f"{avg_novelty:.1f}/10") col2.metric("Avg. Inventive Step", f"{avg_inventive:.1f}/10") col3.metric("Avg. Industrial App.", f"{avg_industrial:.1f}/10") answer_msg = {"role": "assistant", "content": full_response} if img_data: answer_msg["image"] = img_data answer_msg["image_caption"] = img_caption st.session_state.messages.append(answer_msg) if full_response: st.subheader("Download Patent Report") col_md, col_json = st.columns(2) safe_prompt = re.sub(r'[<>:"/\\|?*]', '_', prompt[:30]) col_md.download_button( "📄 Download as Markdown", data=full_response, file_name=f"patent_ideas_{safe_prompt}.md", mime="text/markdown", key=f"download_md_{datetime.now().timestamp()}" ) patent_data = { "query": prompt, "timestamp": datetime.now().isoformat(), "top_50_patents": top_50_ideas if (top_50_ideas and top_50_ideas[0].get('title') != "Error: No ideas could be generated") else [], "statistics": { "total_ideas_generated": len(all_ideas), "ideas_after_prior_art": len(organized_ideas), "patentable_ideas": len(evaluated_ideas), "avg_novelty_score": avg_novelty, "avg_inventive_score": avg_inventive, "avg_industrial_score": avg_industrial } } col_json.download_button( "📊 Download Patent Data (JSON)", data=json.dumps(patent_data, ensure_ascii=False, indent=2), file_name=f"patent_data_{safe_prompt}.json", mime="application/json", key=f"download_json_{datetime.now().timestamp()}" ) if st.session_state.auto_save: fn = f"patent_ideas_{safe_prompt}_{datetime.now():%Y%m%d_%H%M%S}.json" try: with open(fn, "w", encoding="utf-8") as fp: json.dump(patent_data, fp, ensure_ascii=False, indent=2) st.success(f"Auto-saved to: {fn}") except Exception as save_err: st.warning(f"Auto-save failed: {save_err}") except Exception as e: logging.error("process_input error", exc_info=True) st.error(f"⚠️ An error occurred: {e}") if not full_response: full_response = f"⚠️ Error occurred: {str(e)}\n\nPlease try again with a different prompt." st.session_state.messages.append( {"role": "assistant", "content": full_response} ) status.update(label="Error occurred", state="error") # ============================================================================ # ENHANCED SOMA PROCESS # ============================================================================ def process_input_with_enhanced_soma(prompt: str, uploaded_files): """Process input using Enhanced SOMA System""" if not any(m["role"] == "user" and m["content"] == prompt for m in st.session_state.messages): st.session_state.messages.append({"role": "user", "content": prompt}) with st.chat_message("user"): st.markdown(prompt) for i in range(len(st.session_state.messages) - 1): if (st.session_state.messages[i]["role"] == "user" and st.session_state.messages[i]["content"] == prompt and st.session_state.messages[i + 1]["role"] == "assistant"): return with st.chat_message("assistant"): team_container, displays = create_enhanced_team_display() status = st.status("Initializing Enhanced SOMA Team Collaboration...") try: enhanced_soma = EnhancedSOMASystem(FIREWORKS_API_KEY) if enhanced_soma.test_mode: st.warning("⚠️ Running in test mode") test_response = "Enhanced SOMA test mode - collaboration simulation" st.markdown(test_response) st.session_state.messages.append({"role": "assistant", "content": test_response}) status.update(label="Test mode completed", state="complete") return update_process_progress(1, 5, displays) web_search_results = "" if st.session_state.web_search_enabled: status.update(label="Phase 1: Prior Art Search...") display_streaming_progress("search", displays, "Searching...", {"phase": "Prior Art Search"}) try: search_results = [] patent_keywords = f"{prompt} patent invention" result1 = do_web_search(patent_keywords) search_results.append(result1) web_search_results = "\n\n".join(search_results) display_streaming_progress("search", displays, web_search_results, {"label": "Prior Art", "value": "✓"}, is_complete=True) except Exception as e: logging.error(f"Search error: {e}") relevance_scores = compute_relevance_scores(prompt, PHYS_CATEGORIES) combos = generate_random_comparison_matrix( PHYS_CATEGORIES, relevance_scores, depth_range=(2,3), seed=hash(prompt) & 0xFFFFFFFF, T=st.session_state.temp ) # Create JSON serializable initial_context initial_context = { 'task': prompt, 'categories': PHYS_CATEGORIES, 'relevance_scores': { f"{k[0]}-{k[1]}": v for k, v in list(relevance_scores.items())[:20] }, 'combinations': combos[:20], 'web_search_results': web_search_results[:3000], 'temperature': st.session_state.temp, 'language': st.session_state.language } update_process_progress(2, 5, displays) status.update(label="Phase 2-4: Multi-Round Team Collaboration...") display_streaming_progress("research", displays, "Starting autonomous collaboration...", {"phase": "Collaboration"}) collaboration_result = enhanced_soma.autonomous_collaboration( task=prompt, initial_data=initial_context, max_rounds=3, min_consensus=0.8, language=st.session_state.language ) collab_summary = f""" ## Collaboration Complete - **Collaboration Rounds**: {collaboration_result.collaboration_rounds} rounds - **Consensus Score**: {collaboration_result.consensus_score:.1%} - **Fact Checks**: {len(collaboration_result.fact_checks)} checks - **Agent Contributions**: {sum(len(v) for v in collaboration_result.agent_contributions.values())} contributions {collaboration_result.final_output[:1000]}... """ display_streaming_progress("organize", displays, collab_summary, {"label": "Collaboration", "value": "Complete"}, is_complete=True) update_process_progress(4, 5, displays) status.update(label="Phase 5: Final Patent Evaluation...") final_ideas = parse_collaboration_output(collaboration_result.final_output) display_streaming_progress("evaluate", displays, final_ideas, {"label": "Patents", "value": str(len(final_ideas))}, is_complete=True) update_process_progress(5, 5, displays) status.update(label="Generating final report...") full_response = format_enhanced_final_output( collaboration_result, final_ideas, st.session_state.language ) st.session_state.messages.append({ "role": "assistant", "content": full_response }) final_output_container = st.container() with final_output_container: st.markdown("## 📋 Final Collaboration Results") st.markdown(full_response) st.markdown("### 📊 Collaboration Statistics") col1, col2, col3, col4 = st.columns(4) col1.metric("Collaboration Rounds", collaboration_result.collaboration_rounds) col2.metric("Consensus Score", f"{collaboration_result.consensus_score:.1%}") col3.metric("Fact Checks", len(collaboration_result.fact_checks)) col4.metric("Final Patents", len(final_ideas)) if collaboration_result.fact_checks: with st.expander("🔍 Fact Check Results", expanded=False): verified_count = sum(1 for fc in collaboration_result.fact_checks if fc['status'] == 'verified') st.write(f"**Verified**: {verified_count} items") for i, fc in enumerate(collaboration_result.fact_checks[:10], 1): status_icon = { 'verified': '✅', 'suspicious': '⚠️', 'error': '❌' }.get(fc['status'], '❓') st.write(f"{i}. {status_icon} {fc['content'][:200]}...") status.update(label="Enhanced collaboration completed!", state="complete") except Exception as e: logging.error(f"Enhanced SOMA error: {str(e)}", exc_info=True) st.error(f"⚠️ Error: {str(e)}") st.warning("Falling back to standard mode...") process_input_with_soma(prompt, uploaded_files) status.update(label="Completed with fallback mode", state="complete") def parse_collaboration_output(collab_output: str) -> List[Dict]: """Parse ideas from collaboration output""" ideas = [] final_section = re.search(r'## Final Results(.+?)(?=##|$)', collab_output, re.DOTALL) if final_section: content = final_section.group(1) idea_pattern = r'\d+\.\s*\*\*(.+?)\*\*' matches = re.finditer(idea_pattern, content) for match in matches: ideas.append({ 'title': match.group(1).strip(), 'category': 'Collaboration Generated', 'technical_problem': '', 'solution_means': '', 'technical_effects': '', 'patent_score': 80 }) return ideas[:50] def format_enhanced_final_output( collaboration_result: CollaborationResult, final_ideas: List[Dict], language: str = "English" ) -> str: """Format enhanced final output""" output = f"""# 🤝 Enhanced SOMA Collaboration Results ## 📊 Collaboration Overview - **Collaboration Rounds**: {collaboration_result.collaboration_rounds} rounds - **Final Consensus Score**: {collaboration_result.consensus_score:.1%} - **Fact Checks Performed**: {len(collaboration_result.fact_checks)} checks - **Generated Patent Ideas**: {len(final_ideas)} ideas ## 🎯 Collaboration Process ### Round-by-Round Progress """ for i in range(collaboration_result.collaboration_rounds): output += f"\n#### Round {i+1}\n" for agent, contributions in collaboration_result.agent_contributions.items(): if i < len(contributions): output += f"- **{agent.capitalize()}**: {contributions[i][:200]}...\n" output += f""" ## ✅ Fact Check Results Verified: {sum(1 for fc in collaboration_result.fact_checks if fc['status'] == 'verified')} items Questionable: {sum(1 for fc in collaboration_result.fact_checks if fc['status'] == 'suspicious')} items Errors: {sum(1 for fc in collaboration_result.fact_checks if fc['status'] == 'error')} items ## 🏆 Final Collaboration Results {collaboration_result.final_output} ## 💡 Generated Patent Ideas TOP {len(final_ideas)} """ for i, idea in enumerate(final_ideas, 1): output += f""" ### {i}. {idea.get('title', 'Untitled')} - **Category**: {idea.get('category', 'Unclassified')} - **Patentability Score**: {idea.get('patent_score', 0)}/100 - **Technical Problem**: {idea.get('technical_problem', 'Not specified')[:100]}... - **Solution Means**: {idea.get('solution_means', 'Not specified')[:100]}... """ return output # Main application def idea_generator_app(): """Main application - with Enhanced SOMA option""" if 'initialized' not in st.session_state: st.session_state.initialized = True default_vals = { "language": "English", "ai_model": "gemma-3-r1984-27b", "messages": [], "auto_save": True, "generate_image": True, "web_search_enabled": True, "GLOBAL_PICK_COUNT": {}, "_skip_dup_idx": None, "temp": 1.3, "generated_specification": None, "specification_timestamp": None, "generated_drawings": {}, "selected_idea_for_spec": None } for k, v in default_vals.items(): if k not in st.session_state: st.session_state[k] = v st.title("🚀 AGI Patent : Inventor AI") st.markdown(""" ### Enhanced Patent-Focused Process This system transforms creative ideas into **patentable inventions** ready for filing. """) sb = st.sidebar language_choice = sb.radio( "Select Output Language", ["English"], index=0 ) st.session_state.language = language_choice st.session_state.temp = sb.slider( "Creativity Temperature", 0.1, 3.0, st.session_state.temp, 0.1, help="0.1 = Conservative, 3.0 = Highly creative" ) if st.session_state.temp > 2.0: sb.warning("⚡ High creativity mode: More radical patent ideas expected!") sb.markdown("---") sb.subheader("🤖 AI Collaboration Mode") use_enhanced_soma = sb.checkbox( "Enhanced Multi-Agent Collaboration", value=False, help="Autonomous agent collaboration, multi-round improvement, enhanced fact-checking" ) if use_enhanced_soma: sb.info(""" **Enhanced Mode Features:** - 🔄 Autonomous agent collaboration - 🎯 3-round iterative improvement - ✅ Real-time fact checking - 🤝 Consensus-based results """) sb.title("⚙️ Settings") sb.toggle("Auto Save", key="auto_save") sb.toggle("Auto Image Generation", key="generate_image") st.session_state.web_search_enabled = sb.toggle( "Use Web Search (Prior Art)", value=st.session_state.web_search_enabled ) if FIREWORKS_API_KEY: sb.success("✅ Patent AI System Ready") else: sb.warning("⚠️ Running in test mode") sb.markdown("---") sb.markdown(""" ### 📊 Patent Process 1. **Prior Art**: Web search 2. **Research**: ~150 ideas 3. **Organize**: ~100 ideas 4. **Evaluate**: TOP 50 patents **Patent Scores:** - Novelty (30%) - Inventive Step (30%) - Industrial App. (20%) - Specification (20%) """) sb.subheader("💡 Example Topics") c1, c2, c3 = sb.columns(3) if c1.button("Smart Cat Toy", key="ex1"): process_example("Invent a patentable smart cat toy that maintains long-term feline interest through unpredictable interaction patterns, minimal electronics, and self-sustaining mechanisms.") if c2.button("AI Companion", key="ex2"): process_example("Create an innovative AI companion device with adaptive learning, emotional intelligence, and privacy-preserving architecture suitable for patent filing.") if c3.button("Clean Water", key="ex3"): process_example("Design a revolutionary water purification system using minimal energy and sustainable materials for developing countries.") latest_ideas = next( (m["content"] for m in reversed(st.session_state.messages) if m["role"] == "assistant" and m["content"].strip()), None ) if latest_ideas: title_match = re.search(r"# (.*?)(\n|$)", latest_ideas) title = (title_match.group(1) if title_match else "patent_ideas").strip() sb.subheader("📥 Download Patent Report") d1, d2 = sb.columns(2) d1.download_button("📄 Markdown", latest_ideas, file_name=f"{title}.md", mime="text/markdown") d2.download_button("🌐 HTML", md_to_html(latest_ideas, title), file_name=f"{title}.html", mime="text/html") up = sb.file_uploader("Load Conversation (.json)", type=["json"], key="json_uploader") if up: try: st.session_state.messages = json.load(up) sb.success("Conversation history loaded.") except Exception as e: sb.error(f"Failed to load: {e}") if sb.button("Download Conversation as JSON"): sb.download_button( "Save JSON", data=json.dumps(st.session_state.messages, ensure_ascii=False, indent=2), file_name="patent_chat_history.json", mime="application/json" ) # Create two tabs tab1, tab2 = st.tabs(["💡 Idea Generation", "📄 Patent Specification"]) # ======================================================================== # TAB 1: IDEA GENERATION # ======================================================================== with tab1: st.subheader("📎 File Upload (Optional)") uploaded_files = st.file_uploader( "Upload reference files (txt, csv, pdf)", type=["txt", "csv", "pdf"], accept_multiple_files=True, key="file_uploader" ) if uploaded_files: st.success(f"{len(uploaded_files)} files uploaded") with st.expander("Preview Files", expanded=False): for idx, file in enumerate(uploaded_files): st.write(f"**File Name:** {file.name}") ext = file.name.split('.')[-1].lower() try: if ext == 'txt': preview = file.read(1000).decode('utf-8', errors='ignore') file.seek(0) st.text_area("Preview", preview + ("..." if len(preview) >= 1000 else ""), height=150, key=f"preview_txt_{idx}") elif ext == 'csv': df = pd.read_csv(file) file.seek(0) st.dataframe(df.head(5)) elif ext == 'pdf': reader = PyPDF2.PdfReader(io.BytesIO(file.read()), strict=False) file.seek(0) pg_txt = reader.pages[0].extract_text() if reader.pages else "(No text)" st.text_area("Preview", (pg_txt[:500] + "...") if pg_txt else "(No text)", height=150, key=f"preview_pdf_{idx}") except Exception as e: st.error(f"Preview failed: {e}") if idx < len(uploaded_files) - 1: st.divider() skip_idx = st.session_state.get("_skip_dup_idx") for i, m in enumerate(st.session_state.messages): if skip_idx is not None and i == skip_idx: continue with st.chat_message(m["role"]): st.markdown(m["content"]) if "image" in m: st.image(m["image"], caption=m.get("image_caption", "")) st.session_state["_skip_dup_idx"] = None prompt = st.chat_input("💭 Describe your invention challenge for patent-worthy ideas...") if prompt: if use_enhanced_soma: process_input_with_enhanced_soma(prompt, uploaded_files) else: process_input_with_soma(prompt, uploaded_files) # ======================================================================== # TAB 2: PATENT SPECIFICATION GENERATOR # ======================================================================== with tab2: st.subheader("📄 Patent Specification Generator") st.markdown(""" Generate a complete, filing-ready patent specification based on your generated ideas. The specification will include all required sections: technical field, background, problem statement, solution, effects, drawings description, detailed description, and claims. """) if not st.session_state.messages or len([m for m in st.session_state.messages if m["role"] == "assistant"]) == 0: st.info("💡 Please generate patent ideas in the 'Idea Generation' tab first.") else: # Find the most recent generated ideas latest_ideas = None for m in reversed(st.session_state.messages): if m["role"] == "assistant" and "Final TOP" in m["content"]: latest_ideas = m["content"] break if latest_ideas: st.success("✅ Patent ideas found. You can now generate a complete specification.") # Extract idea titles from the response idea_titles = [] idea_details = {} current_rank = None current_content = [] for line in latest_ideas.split('\n'): rank_match = re.match(r'^## Rank (\d+): (.+?) \(Patentability Score:', line) if rank_match: if current_rank and current_content: idea_details[idea_titles[-1]] = '\n'.join(current_content) current_rank = rank_match.group(1) title = rank_match.group(2).strip() idea_titles.append(title) current_content = [line] elif current_rank: current_content.append(line) if line.startswith('---'): idea_details[idea_titles[-1]] = '\n'.join(current_content) current_rank = None current_content = [] if idea_titles: col1, col2 = st.columns([3, 1]) with col1: selected_idea = st.selectbox( "Select an idea to generate patent specification:", idea_titles, key="spec_idea_selector", help="Choose the invention you want to create a full patent specification for" ) with col2: spec_language = st.selectbox( "Specification Language:", ["English", "Korean"], key="spec_language" ) col3, col4 = st.columns(2) with col3: include_drawings = st.checkbox( "Include Patent Drawings", value=True, key="include_drawings", help="Generate visual diagrams for the patent" ) with col4: num_claims = st.number_input( "Number of Claims:", min_value=5, max_value=20, value=10, key="num_claims", help="Total claims (independent + dependent)" ) st.markdown("---") # Show preview of selected idea with st.expander("📋 Preview Selected Idea Details", expanded=False): if selected_idea in idea_details: st.markdown(idea_details[selected_idea]) else: st.write("Details not available") if st.button("🚀 Generate Complete Patent Specification", type="primary", key="generate_spec", use_container_width=True): with st.spinner("🔄 Generating comprehensive patent specification..."): # Extract detailed information for the selected idea idea_section = idea_details.get(selected_idea, "") # Create specification generation prompt spec_prompt = f"""Generate a complete, professional patent specification for the following invention in {spec_language}: {idea_section} Create a comprehensive patent specification with the following structure: 1. **TITLE OF INVENTION** - Clear, concise title (10-15 words) 2. **TECHNICAL FIELD** - Specific technical domain - Related technologies - Classification codes (if applicable) 3. **BACKGROUND ART** - Prior art analysis (3-5 examples) - Problems with existing solutions - Market needs and gaps - Technical limitations of prior art 4. **PROBLEMS TO BE SOLVED** - Primary technical problem (detailed) - Secondary problems (2-3 items) - Quantitative metrics showing need 5. **MEANS FOR SOLVING PROBLEMS** - Core technical configuration (detailed) - Key mechanisms and components - Process flow and interactions - Novel elements and combinations - Technical advantages over prior art 6. **EFFECTS OF INVENTION** - Primary technical effects (quantified) - Secondary benefits - Industrial advantages - Economic benefits - Comparison with prior art (specific numbers/percentages) 7. **BRIEF DESCRIPTION OF DRAWINGS** - Figure 1: Overall system structure - Figure 2: Detailed component diagram - Figure 3: Process flow chart - Figure 4: Alternative embodiment - (Add more as needed) 8. **DETAILED DESCRIPTION OF EMBODIMENTS** **Embodiment 1: Primary Implementation** - Detailed technical description - Component specifications - Operating principles - Specific examples with parameters - Materials and dimensions **Embodiment 2: Alternative Implementation** - Variations and modifications - Different configurations - Alternative materials/methods **Embodiment 3: Advanced Implementation** - Enhanced features - Additional capabilities 9. **CLAIMS** (Generate {num_claims} claims total) **Independent Claims (3 claims):** - Claim 1: System/apparatus claim (broadest) - Claim 2: Method claim - Claim 3: Alternative system claim **Dependent Claims ({num_claims - 3} claims):** - Specific implementations - Preferred embodiments - Optional features - Alternative configurations IMPORTANT REQUIREMENTS: - Use precise technical language - Include specific parameters, ranges, and measurements - Provide concrete examples - Ensure claims are clear and enforceable - Make claims progressively narrower from independent to dependent - Avoid ambiguous terms - Include support for all claim elements in the description - Ensure consistency between description and claims Generate a complete, filing-ready patent specification.""" soma = SOMASystem(FIREWORKS_API_KEY) specification = "" spec_placeholder = st.empty() status_container = st.container() try: with status_container: status = st.status("📝 Generating patent specification...", expanded=True) progress_bar = st.progress(0) progress_text = st.empty() chunk_count = 0 total_estimated_chunks = 200 for chunk in soma.call_llm_streaming( [{"role": "user", "content": spec_prompt}], "creator", max_tokens=15000, language=spec_language ): if chunk: specification += chunk chunk_count += 1 progress = min(chunk_count / total_estimated_chunks, 0.99) progress_bar.progress(progress) # Estimate current section if "TECHNICAL FIELD" in specification and "BACKGROUND" not in specification: section = "Technical Field" elif "BACKGROUND" in specification and "PROBLEMS" not in specification: section = "Background Art" elif "PROBLEMS" in specification and "MEANS FOR" not in specification: section = "Problems to be Solved" elif "MEANS FOR" in specification and "EFFECTS" not in specification: section = "Solution Means" elif "EFFECTS" in specification and "BRIEF DESCRIPTION" not in specification: section = "Effects" elif "BRIEF DESCRIPTION" in specification and "DETAILED DESCRIPTION" not in specification: section = "Drawing Descriptions" elif "DETAILED DESCRIPTION" in specification and "CLAIMS" not in specification: section = "Detailed Description" elif "CLAIMS" in specification: section = "Claims" else: section = "Initializing" progress_text.text(f"Generating: {section} ({chunk_count} chunks)") spec_placeholder.markdown(specification) progress_bar.progress(1.0) status.update(label="✅ Patent specification generated successfully!", state="complete") st.session_state.generated_specification = specification st.session_state.specification_timestamp = datetime.now() st.session_state.selected_idea_for_spec = selected_idea st.success("✅ Patent specification generation completed!") # Generate patent drawings if requested if include_drawings and IMAGE_API_URL: st.markdown("---") st.subheader("📐 Generating Patent Drawings") drawing_prompts = [ f"Patent technical drawing: overall system architecture and structure of {selected_idea}, professional engineering diagram, labeled components, clean lines, patent illustration style", f"Patent technical drawing: detailed internal mechanism and component relationships of {selected_idea}, cross-section view, technical schematic, patent drawing style", f"Patent technical drawing: process flow diagram showing operation sequence of {selected_idea}, flowchart format, step-by-step illustration, patent documentation style", f"Patent technical drawing: alternative embodiment and variation of {selected_idea}, comparative diagram, technical illustration, patent figure style" ] drawing_cols = st.columns(2) for i, draw_prompt in enumerate(drawing_prompts, 1): col_idx = (i - 1) % 2 with drawing_cols[col_idx]: with st.spinner(f"Generating Figure {i}..."): try: img_data, _ = generate_image(draw_prompt) if img_data: st.image(img_data, caption=f"Figure {i}: {['System Structure', 'Detailed Mechanism', 'Process Flow', 'Alternative Embodiment'][i-1]}") st.session_state.generated_drawings[f"figure_{i}"] = img_data else: st.warning(f"Figure {i} generation returned no data") except Exception as e: st.error(f"Figure {i} generation failed: {str(e)}") if st.session_state.generated_drawings: st.success(f"✅ Generated {len(st.session_state.generated_drawings)} patent drawings") except Exception as e: st.error(f"❌ Error generating specification: {str(e)}") logging.error(f"Specification generation error: {str(e)}", exc_info=True) else: st.warning("⚠️ Could not extract idea titles from the generated content.") else: st.warning("⚠️ No patent ideas found. Please generate ideas in the 'Idea Generation' tab first.") # Display and download generated specification if st.session_state.generated_specification: st.markdown("---") st.subheader("📄 Generated Patent Specification") if st.session_state.selected_idea_for_spec: st.info(f"**Invention**: {st.session_state.selected_idea_for_spec}") if st.session_state.specification_timestamp: st.caption(f"Generated: {st.session_state.specification_timestamp.strftime('%Y-%m-%d %H:%M:%S')}") with st.expander("📖 View Complete Specification", expanded=True): st.markdown(st.session_state.generated_specification) # Download buttons st.subheader("💾 Download Options") col1, col2, col3, col4 = st.columns(4) safe_filename = re.sub(r'[<>:"/\\|?*]', '_', st.session_state.selected_idea_for_spec if st.session_state.selected_idea_for_spec else 'patent')[:30] with col1: st.download_button( "📄 Download Markdown", data=st.session_state.generated_specification, file_name=f"patent_spec_{safe_filename}.md", mime="text/markdown", key="download_spec_md", use_container_width=True ) with col2: html_content = md_to_html( st.session_state.generated_specification, f"Patent Specification: {safe_filename}" ) st.download_button( "🌐 Download HTML", data=html_content, file_name=f"patent_spec_{safe_filename}.html", mime="text/html", key="download_spec_html", use_container_width=True ) with col3: # Create DOCX version try: doc = Document() doc.add_heading(f"Patent Specification: {safe_filename}", 0) for line in st.session_state.generated_specification.split('\n'): if line.startswith('# '): doc.add_heading(line[2:], level=1) elif line.startswith('## '): doc.add_heading(line[3:], level=2) elif line.startswith('### '): doc.add_heading(line[4:], level=3) elif line.strip(): doc.add_paragraph(line) doc_buffer = io.BytesIO() doc.save(doc_buffer) doc_buffer.seek(0) st.download_button( "📝 Download DOCX", data=doc_buffer.getvalue(), file_name=f"patent_spec_{safe_filename}.docx", mime="application/vnd.openxmlformats-officedocument.wordprocessingml.document", key="download_spec_docx", use_container_width=True ) except Exception as e: st.button("📝 DOCX (Error)", disabled=True, use_container_width=True) logging.error(f"DOCX generation error: {e}") with col4: if st.session_state.generated_drawings: st.info(f"📐 {len(st.session_state.generated_drawings)} Drawings") else: st.button("📐 No Drawings", disabled=True, use_container_width=True) # Statistics if st.session_state.generated_specification: st.markdown("---") st.subheader("📊 Specification Statistics") spec_text = st.session_state.generated_specification word_count = len(spec_text.split()) char_count = len(spec_text) line_count = len(spec_text.split('\n')) claims_match = re.search(r'CLAIMS|Claims', spec_text, re.IGNORECASE) claims_section = spec_text[claims_match.start():] if claims_match else "" num_claims_found = len(re.findall(r'\bClaim \d+', claims_section, re.IGNORECASE)) stat_col1, stat_col2, stat_col3, stat_col4 = st.columns(4) stat_col1.metric("Total Words", f"{word_count:,}") stat_col2.metric("Total Characters", f"{char_count:,}") stat_col3.metric("Total Lines", f"{line_count:,}") stat_col4.metric("Claims Detected", num_claims_found) # Sidebar footer sb.markdown("---") sb.markdown("Enhanced by [SOMA Technology](https://discord.gg/openfreeai)") sb.markdown("v4.2 - Complete Patent Specification System") with sb.expander("🔧 API Status", expanded=False): st.write("**System Status:**") if FIREWORKS_API_KEY: st.write("✅ Friendli AI: Connected") else: st.write("❌ Friendli AI: Not configured") if BRAVE_KEY: st.write("✅ Brave Search: Connected") else: st.write("❌ Brave Search: Not configured") if IMAGE_API_URL: st.write("✅ Image Generation: Connected") else: st.write("❌ Image Generation: Not configured") st.write(f"**Session Messages**: {len(st.session_state.messages)}") st.write(f"**Specification Generated**: {'Yes' if st.session_state.generated_specification else 'No'}") st.write(f"**Drawings Generated**: {len(st.session_state.generated_drawings)}") # Main execution if __name__ == "__main__": try: idea_generator_app() except Exception as e: logging.error(f"Application startup error: {str(e)}", exc_info=True) st.error(f"Application startup error: {str(e)}")