Spaces:
Sleeping
Sleeping
| """ | |
| Synthesis Agent | |
| Combines SEC filing analysis with market intelligence to generate final recommendation | |
| Uses Gemini API for intelligent synthesis when available, falls back to rule-based logic | |
| """ | |
| from typing import Dict, Any, Optional, List | |
| import logging | |
| import os | |
| from .base import BaseAgent | |
| from ..core.types import ( | |
| AnalysisRequest, | |
| InvestmentRecommendation, | |
| RiskFactor, | |
| EquityAnalysisResult, | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Import Gemini if available | |
| try: | |
| import google.generativeai as genai | |
| GEMINI_AVAILABLE = True | |
| except ImportError: | |
| GEMINI_AVAILABLE = False | |
| logger.warning("Gemini not available, using rule-based synthesis") | |
| class SynthesisAgent(BaseAgent): | |
| """ | |
| Agent that synthesizes all analysis into actionable recommendation | |
| - Cross-references SEC fundamentals with market action | |
| - Identifies discrepancies (e.g., positive filings but negative price action) | |
| - Generates evidence-based recommendation | |
| """ | |
| def __init__(self): | |
| super().__init__( | |
| name="Chief Investment Strategist", | |
| role="Synthesize multi-source analysis into actionable investment insights", | |
| goal="Provide clear, evidence-based investment recommendations", | |
| ) | |
| # Initialize Gemini if available | |
| self.use_gemini = False | |
| self.model = None | |
| if GEMINI_AVAILABLE: | |
| api_key = os.getenv("GEMINI_API_KEY") | |
| if api_key: | |
| try: | |
| genai.configure(api_key=api_key) | |
| # Use Gemini 2.0 Flash (latest, fastest, has built-in caching) | |
| self.model = genai.GenerativeModel('gemini-2.0-flash-exp') | |
| self.use_gemini = True | |
| logger.info("Gemini 2.0 Flash initialized successfully") | |
| except Exception as e: | |
| logger.warning(f"Failed to initialize Gemini: {e}") | |
| else: | |
| logger.info("GEMINI_API_KEY not found, using rule-based synthesis") | |
| async def execute( | |
| self, request: AnalysisRequest, context: Optional[Dict[str, Any]] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Synthesize all analyses into final recommendation | |
| Args: | |
| request: Original analysis request | |
| context: Results from SEC and Market agents | |
| Returns: | |
| Final recommendation | |
| """ | |
| try: | |
| logger.info(f"Synthesis Agent generating recommendation for {request.ticker}") | |
| if not context: | |
| return { | |
| "status": "error", | |
| "message": "No context provided for synthesis", | |
| } | |
| # Extract previous agent results | |
| sec_result = context.get("sec_agent", {}) | |
| market_result = context.get("market_agent", {}) | |
| # Generate recommendation | |
| recommendation = self._generate_recommendation( | |
| request, sec_result, market_result | |
| ) | |
| # Create complete analysis result | |
| analysis_result = EquityAnalysisResult( | |
| request=request, | |
| sec_analysis=sec_result.get("analysis"), | |
| market_intelligence=market_result.get("intelligence"), | |
| recommendation=recommendation, | |
| metadata={ | |
| "sec_summary": sec_result.get("summary"), | |
| "market_summary": market_result.get("summary"), | |
| }, | |
| ) | |
| return { | |
| "status": "success", | |
| "recommendation": recommendation, | |
| "analysis_result": analysis_result, | |
| "summary": self._create_summary(recommendation), | |
| } | |
| except Exception as e: | |
| import traceback | |
| logger.error(f"Error in Synthesis agent: {str(e)}") | |
| logger.error(f"Full traceback: {traceback.format_exc()}") | |
| return {"status": "error", "message": str(e)} | |
| def _generate_recommendation( | |
| self, | |
| request: AnalysisRequest, | |
| sec_result: Dict, | |
| market_result: Dict, | |
| ) -> InvestmentRecommendation: | |
| """Generate investment recommendation based on all analyses""" | |
| # Extract key data | |
| sec_insights = sec_result.get("insights", {}) | |
| market_insights = market_result.get("insights", {}) | |
| # Use Gemini if available, otherwise fall back to rule-based | |
| if self.use_gemini: | |
| try: | |
| return self._generate_with_gemini( | |
| request, sec_insights, market_insights | |
| ) | |
| except Exception as e: | |
| logger.warning(f"Gemini synthesis failed, falling back to rules: {e}") | |
| import traceback | |
| logger.error(f"Full traceback: {traceback.format_exc()}") | |
| # Fall through to rule-based logic | |
| # Rule-based logic (fallback or default) | |
| # Determine overall sentiment | |
| sentiment = self._determine_sentiment(sec_insights, market_insights) | |
| confidence = self._determine_confidence(sec_insights, market_insights) | |
| # Identify risks | |
| risks = self._identify_risks(sec_insights, market_insights) | |
| # Identify opportunities | |
| opportunities = self._identify_opportunities(sec_insights, market_insights) | |
| # Generate recommended action | |
| action = self._generate_action(sentiment, confidence, risks) | |
| # Build reasoning | |
| reasoning = self._build_reasoning( | |
| request.ticker, sentiment, sec_insights, market_insights, risks, opportunities | |
| ) | |
| return InvestmentRecommendation( | |
| ticker=request.ticker, | |
| sentiment=sentiment, | |
| confidence=confidence, | |
| key_risks=risks, | |
| key_opportunities=opportunities, | |
| recommended_action=action, | |
| reasoning=reasoning, | |
| ) | |
| def _generate_with_gemini( | |
| self, | |
| request: AnalysisRequest, | |
| sec_insights: Dict, | |
| market_insights: Dict, | |
| ) -> InvestmentRecommendation: | |
| """Generate recommendation using Gemini API""" | |
| # Build comprehensive prompt with all analysis data | |
| prompt = f"""You are a Chief Investment Strategist analyzing equity {request.ticker}. | |
| **SEC Filing Analysis (FinBERT/SEC-BERT sentiment analysis):** | |
| - Overall Sentiment: {sec_insights.get('overall_sentiment', 'N/A').upper()} | |
| - Confidence: {sec_insights.get('confidence', 0):.2%} | |
| Component Analysis: | |
| """ | |
| # Add component sentiments | |
| components = sec_insights.get('components', {}) | |
| if isinstance(components, dict): | |
| for comp, data in components.items(): | |
| if isinstance(data, dict): | |
| sentiment = data.get('sentiment', 'N/A') | |
| confidence = data.get('confidence', 0) | |
| prompt += f"- {comp.replace('_', ' ').title()}: {str(sentiment).upper()} ({confidence:.2%})\n" | |
| # Add identified risks from SEC filings | |
| prompt += "\nKey Risk Factors from SEC Filings:\n" | |
| key_risks = sec_insights.get('key_risks', []) | |
| if isinstance(key_risks, list): | |
| for risk in key_risks[:5]: | |
| if isinstance(risk, dict): | |
| prompt += f"- {risk.get('phrase', 'N/A')}\n" | |
| elif isinstance(risk, str): | |
| prompt += f"- {risk}\n" | |
| # Add opportunities from SEC filings | |
| prompt += "\nKey Opportunities from SEC Filings:\n" | |
| key_opps = sec_insights.get('key_opportunities', []) | |
| if isinstance(key_opps, list): | |
| for opp in key_opps[:5]: | |
| if isinstance(opp, dict): | |
| prompt += f"- {opp.get('phrase', 'N/A')} ({opp.get('component', 'N/A')})\n" | |
| elif isinstance(opp, str): | |
| prompt += f"- {opp}\n" | |
| # Add market intelligence | |
| prompt += f""" | |
| **Market Intelligence:** | |
| - Price Trend: {market_insights.get('price_trend', 'N/A')} | |
| - News Sentiment: {market_insights.get('news_sentiment', 'N/A')} | |
| Technical Signals: | |
| """ | |
| tech_signals = market_insights.get('technical_signals', {}) | |
| if isinstance(tech_signals, dict): | |
| for indicator, signal in tech_signals.items(): | |
| prompt += f"- {indicator.upper()}: {signal}\n" | |
| # Add notable events | |
| notable_events = market_insights.get('notable_events', []) | |
| if isinstance(notable_events, list) and notable_events: | |
| prompt += "\nNotable News Events:\n" | |
| for event in notable_events[:3]: | |
| if isinstance(event, str): | |
| prompt += f"- {event}\n" | |
| # Request structured output | |
| prompt += """ | |
| **Your Task:** | |
| Synthesize the above fundamental analysis (SEC filings) and market intelligence into a comprehensive investment recommendation. | |
| IMPORTANT: Return ONLY valid JSON. No markdown, no code blocks, no extra text. Use spaces instead of tabs or special characters. | |
| { | |
| "sentiment": "BULLISH or BEARISH or NEUTRAL", | |
| "confidence": "HIGH or MEDIUM or LOW", | |
| "risks": [ | |
| {"category": "Fundamental or Technical or Sentiment", "description": "single line description", "severity": "high or medium or low", "evidence": ["brief evidence point"]} | |
| ], | |
| "opportunities": ["opportunity 1", "opportunity 2"], | |
| "action": "STRONG BUY or BUY or HOLD or SELL or AVOID - brief rationale", | |
| "reasoning": "2-3 paragraph analysis with no special formatting or line breaks within paragraphs" | |
| } | |
| """ | |
| # Call Gemini with JSON response mode | |
| logger.info(f"Calling Gemini 2.0 Flash for {request.ticker} synthesis...") | |
| # Configure for JSON output | |
| generation_config = { | |
| "response_mime_type": "application/json" | |
| } | |
| response = self.model.generate_content( | |
| prompt, | |
| generation_config=generation_config | |
| ) | |
| # Parse response | |
| import json | |
| import re | |
| response_text = response.text.strip() | |
| # Log raw response for debugging | |
| logger.debug(f"Gemini raw response (first 500 chars): {response_text[:500]}") | |
| # Extract JSON from markdown code blocks if present | |
| if "```json" in response_text: | |
| response_text = response_text.split("```json")[1].split("```")[0].strip() | |
| elif "```" in response_text: | |
| response_text = response_text.split("```")[1].split("```")[0].strip() | |
| # Simple approach: Remove ALL newlines and tabs from JSON | |
| # JSON doesn't need pretty-printing to be valid | |
| response_text = response_text.replace('\n', ' ').replace('\r', ' ').replace('\t', ' ') | |
| # Replace multiple spaces with single space | |
| response_text = re.sub(r'\s+', ' ', response_text) | |
| # Remove non-printable control characters (but we already removed \n, \r, \t) | |
| response_text = ''.join( | |
| char if char.isprintable() or char == ' ' else ' ' | |
| for char in response_text | |
| ) | |
| try: | |
| result = json.loads(response_text) | |
| except json.JSONDecodeError as e: | |
| # Log the problematic JSON for debugging | |
| logger.error(f"JSON decode error: {e}") | |
| logger.error(f"Problematic JSON (first 1000 chars): {response_text[:1000]}") | |
| raise | |
| # Convert to InvestmentRecommendation | |
| risks = [ | |
| RiskFactor( | |
| category=risk.get("category", "Unknown"), | |
| description=risk.get("description", ""), | |
| severity=risk.get("severity", "medium"), | |
| evidence=risk.get("evidence", []), | |
| ) | |
| for risk in result.get("risks", []) | |
| ] | |
| return InvestmentRecommendation( | |
| ticker=request.ticker, | |
| sentiment=result.get("sentiment", "NEUTRAL"), | |
| confidence=result.get("confidence", "MEDIUM"), | |
| key_risks=risks, | |
| key_opportunities=result.get("opportunities", []), | |
| recommended_action=result.get("action", "HOLD - Insufficient data"), | |
| reasoning=result.get("reasoning", "No reasoning provided"), | |
| ) | |
| def _determine_sentiment( | |
| self, sec_insights: Dict, market_insights: Dict | |
| ) -> str: | |
| """Determine overall sentiment (BULLISH/BEARISH/NEUTRAL)""" | |
| # SEC sentiment | |
| sec_sentiment = sec_insights.get("overall_sentiment", "neutral") | |
| # Market sentiment | |
| price_trend = market_insights.get("price_trend", "NEUTRAL") | |
| news_sentiment = market_insights.get("news_sentiment", "NEUTRAL") | |
| # Weighted decision | |
| bullish_signals = 0 | |
| bearish_signals = 0 | |
| # SEC analysis (higher weight) | |
| if sec_sentiment == "positive": | |
| bullish_signals += 2 | |
| elif sec_sentiment == "negative": | |
| bearish_signals += 2 | |
| # Price trend | |
| if price_trend == "BULLISH": | |
| bullish_signals += 1 | |
| elif price_trend == "BEARISH": | |
| bearish_signals += 1 | |
| # News sentiment | |
| if news_sentiment == "POSITIVE": | |
| bullish_signals += 1 | |
| elif news_sentiment == "NEGATIVE": | |
| bearish_signals += 1 | |
| # Determine overall | |
| if bullish_signals > bearish_signals + 1: | |
| return "BULLISH" | |
| elif bearish_signals > bullish_signals + 1: | |
| return "BEARISH" | |
| else: | |
| return "NEUTRAL" | |
| def _determine_confidence( | |
| self, sec_insights: Dict, market_insights: Dict | |
| ) -> str: | |
| """Determine confidence level (HIGH/MEDIUM/LOW)""" | |
| sec_confidence = sec_insights.get("confidence", 0) | |
| has_market_data = bool(market_insights.get("technical_signals")) | |
| has_news = bool(market_insights.get("notable_events")) | |
| # High confidence if SEC is confident and market confirms | |
| if sec_confidence > 0.7 and has_market_data and has_news: | |
| return "HIGH" | |
| # Low confidence if missing data or conflicting signals | |
| elif sec_confidence < 0.5 or not has_market_data: | |
| return "LOW" | |
| else: | |
| return "MEDIUM" | |
| def _identify_risks( | |
| self, sec_insights: Dict, market_insights: Dict | |
| ) -> List[RiskFactor]: | |
| """Identify key risk factors""" | |
| risks = [] | |
| # SEC-identified risks | |
| sec_risks = sec_insights.get("key_risks", []) | |
| for risk in sec_risks[:3]: # Top 3 | |
| risks.append( | |
| RiskFactor( | |
| category="Fundamental", | |
| description=risk.get("phrase", "Unknown risk"), | |
| severity="high" if risk.get("importance", 0) > 0.5 else "medium", | |
| evidence=["SEC filing analysis"], | |
| ) | |
| ) | |
| # Market risks | |
| tech_signals = market_insights.get("technical_signals", {}) | |
| if tech_signals.get("rsi") == "OVERBOUGHT": | |
| risks.append( | |
| RiskFactor( | |
| category="Technical", | |
| description="Stock appears overbought (RSI > 70)", | |
| severity="medium", | |
| evidence=["Technical indicators"], | |
| ) | |
| ) | |
| # News risks | |
| if market_insights.get("news_sentiment") == "NEGATIVE": | |
| risks.append( | |
| RiskFactor( | |
| category="Sentiment", | |
| description="Negative news sentiment detected", | |
| severity="medium", | |
| evidence=market_insights.get("notable_events", [])[:2], | |
| ) | |
| ) | |
| return risks | |
| def _identify_opportunities( | |
| self, sec_insights: Dict, market_insights: Dict | |
| ) -> List[str]: | |
| """Identify key opportunities""" | |
| opportunities = [] | |
| # SEC opportunities | |
| sec_opps = sec_insights.get("key_opportunities", []) | |
| for opp in sec_opps[:3]: | |
| opportunities.append(f"{opp.get('phrase')} ({opp.get('component')})") | |
| # Technical opportunities | |
| tech_signals = market_insights.get("technical_signals", {}) | |
| if tech_signals.get("rsi") == "OVERSOLD": | |
| opportunities.append("Potentially oversold - technical bounce opportunity") | |
| return opportunities | |
| def _generate_action( | |
| self, sentiment: str, confidence: str, risks: List[RiskFactor] | |
| ) -> str: | |
| """Generate recommended action""" | |
| high_severity_risks = sum(1 for r in risks if r.severity == "high") | |
| if sentiment == "BULLISH": | |
| if confidence == "HIGH" and high_severity_risks == 0: | |
| return "STRONG BUY - High conviction opportunity" | |
| elif confidence == "MEDIUM" or high_severity_risks <= 1: | |
| return "BUY - Favorable risk/reward, size position appropriately" | |
| else: | |
| return "HOLD - Monitor for risk reduction before entering" | |
| elif sentiment == "BEARISH": | |
| if confidence == "HIGH": | |
| return "SELL/AVOID - High confidence bearish outlook" | |
| else: | |
| return "HOLD - Bearish signals but insufficient confidence to sell" | |
| else: # NEUTRAL | |
| return "HOLD - Insufficient conviction either way, await clearer signals" | |
| def _build_reasoning( | |
| self, | |
| ticker: str, | |
| sentiment: str, | |
| sec_insights: Dict, | |
| market_insights: Dict, | |
| risks: List[RiskFactor], | |
| opportunities: List[str], | |
| ) -> str: | |
| """Build detailed reasoning for recommendation""" | |
| reasoning = f"Analysis of {ticker}:\n\n" | |
| # SEC analysis | |
| reasoning += "Fundamental Analysis (SEC Filings):\n" | |
| reasoning += f"- Overall sentiment: {sec_insights.get('overall_sentiment', 'N/A').upper()}\n" | |
| reasoning += f"- Confidence: {sec_insights.get('confidence', 0):.2%}\n" | |
| comp_sentiments = sec_insights.get("components", {}) | |
| if comp_sentiments: | |
| reasoning += "- Component analysis:\n" | |
| for comp, data in comp_sentiments.items(): | |
| reasoning += f" • {comp.replace('_', ' ').title()}: {data.get('sentiment', 'N/A').upper()}\n" | |
| # Market analysis | |
| reasoning += "\nMarket Analysis:\n" | |
| reasoning += f"- Price trend: {market_insights.get('price_trend', 'N/A')}\n" | |
| reasoning += f"- News sentiment: {market_insights.get('news_sentiment', 'N/A')}\n" | |
| tech_signals = market_insights.get("technical_signals", {}) | |
| if tech_signals: | |
| reasoning += "- Technical signals:\n" | |
| for indicator, signal in tech_signals.items(): | |
| reasoning += f" • {indicator.upper()}: {signal}\n" | |
| # Synthesis | |
| reasoning += f"\nSynthesis: {sentiment} outlook" | |
| if risks: | |
| reasoning += f"\nKey risks identified: {len(risks)}" | |
| if opportunities: | |
| reasoning += f"\nKey opportunities: {', '.join(opportunities[:2])}" | |
| return reasoning | |
| def _create_summary(self, recommendation: InvestmentRecommendation) -> str: | |
| """Create concise summary""" | |
| summary = f""" | |
| Investment Recommendation for {recommendation.ticker} | |
| Sentiment: {recommendation.sentiment} | |
| Confidence: {recommendation.confidence} | |
| Key Risks: | |
| {chr(10).join(f'- {risk.description}' for risk in recommendation.key_risks[:3])} | |
| Recommended Action: {recommendation.recommended_action} | |
| """ | |
| return summary.strip() | |