financial-research-agent / src /agents /synthesis_agent.py
Sanchit7's picture
$(cat <<EOF
6601d76
"""
Synthesis Agent
Combines SEC filing analysis with market intelligence to generate final recommendation
Uses Gemini API for intelligent synthesis when available, falls back to rule-based logic
"""
from typing import Dict, Any, Optional, List
import logging
import os
from .base import BaseAgent
from ..core.types import (
AnalysisRequest,
InvestmentRecommendation,
RiskFactor,
EquityAnalysisResult,
)
logger = logging.getLogger(__name__)
# Import Gemini if available
try:
import google.generativeai as genai
GEMINI_AVAILABLE = True
except ImportError:
GEMINI_AVAILABLE = False
logger.warning("Gemini not available, using rule-based synthesis")
class SynthesisAgent(BaseAgent):
"""
Agent that synthesizes all analysis into actionable recommendation
- Cross-references SEC fundamentals with market action
- Identifies discrepancies (e.g., positive filings but negative price action)
- Generates evidence-based recommendation
"""
def __init__(self):
super().__init__(
name="Chief Investment Strategist",
role="Synthesize multi-source analysis into actionable investment insights",
goal="Provide clear, evidence-based investment recommendations",
)
# Initialize Gemini if available
self.use_gemini = False
self.model = None
if GEMINI_AVAILABLE:
api_key = os.getenv("GEMINI_API_KEY")
if api_key:
try:
genai.configure(api_key=api_key)
# Use Gemini 2.0 Flash (latest, fastest, has built-in caching)
self.model = genai.GenerativeModel('gemini-2.0-flash-exp')
self.use_gemini = True
logger.info("Gemini 2.0 Flash initialized successfully")
except Exception as e:
logger.warning(f"Failed to initialize Gemini: {e}")
else:
logger.info("GEMINI_API_KEY not found, using rule-based synthesis")
async def execute(
self, request: AnalysisRequest, context: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Synthesize all analyses into final recommendation
Args:
request: Original analysis request
context: Results from SEC and Market agents
Returns:
Final recommendation
"""
try:
logger.info(f"Synthesis Agent generating recommendation for {request.ticker}")
if not context:
return {
"status": "error",
"message": "No context provided for synthesis",
}
# Extract previous agent results
sec_result = context.get("sec_agent", {})
market_result = context.get("market_agent", {})
# Generate recommendation
recommendation = self._generate_recommendation(
request, sec_result, market_result
)
# Create complete analysis result
analysis_result = EquityAnalysisResult(
request=request,
sec_analysis=sec_result.get("analysis"),
market_intelligence=market_result.get("intelligence"),
recommendation=recommendation,
metadata={
"sec_summary": sec_result.get("summary"),
"market_summary": market_result.get("summary"),
},
)
return {
"status": "success",
"recommendation": recommendation,
"analysis_result": analysis_result,
"summary": self._create_summary(recommendation),
}
except Exception as e:
import traceback
logger.error(f"Error in Synthesis agent: {str(e)}")
logger.error(f"Full traceback: {traceback.format_exc()}")
return {"status": "error", "message": str(e)}
def _generate_recommendation(
self,
request: AnalysisRequest,
sec_result: Dict,
market_result: Dict,
) -> InvestmentRecommendation:
"""Generate investment recommendation based on all analyses"""
# Extract key data
sec_insights = sec_result.get("insights", {})
market_insights = market_result.get("insights", {})
# Use Gemini if available, otherwise fall back to rule-based
if self.use_gemini:
try:
return self._generate_with_gemini(
request, sec_insights, market_insights
)
except Exception as e:
logger.warning(f"Gemini synthesis failed, falling back to rules: {e}")
import traceback
logger.error(f"Full traceback: {traceback.format_exc()}")
# Fall through to rule-based logic
# Rule-based logic (fallback or default)
# Determine overall sentiment
sentiment = self._determine_sentiment(sec_insights, market_insights)
confidence = self._determine_confidence(sec_insights, market_insights)
# Identify risks
risks = self._identify_risks(sec_insights, market_insights)
# Identify opportunities
opportunities = self._identify_opportunities(sec_insights, market_insights)
# Generate recommended action
action = self._generate_action(sentiment, confidence, risks)
# Build reasoning
reasoning = self._build_reasoning(
request.ticker, sentiment, sec_insights, market_insights, risks, opportunities
)
return InvestmentRecommendation(
ticker=request.ticker,
sentiment=sentiment,
confidence=confidence,
key_risks=risks,
key_opportunities=opportunities,
recommended_action=action,
reasoning=reasoning,
)
def _generate_with_gemini(
self,
request: AnalysisRequest,
sec_insights: Dict,
market_insights: Dict,
) -> InvestmentRecommendation:
"""Generate recommendation using Gemini API"""
# Build comprehensive prompt with all analysis data
prompt = f"""You are a Chief Investment Strategist analyzing equity {request.ticker}.
**SEC Filing Analysis (FinBERT/SEC-BERT sentiment analysis):**
- Overall Sentiment: {sec_insights.get('overall_sentiment', 'N/A').upper()}
- Confidence: {sec_insights.get('confidence', 0):.2%}
Component Analysis:
"""
# Add component sentiments
components = sec_insights.get('components', {})
if isinstance(components, dict):
for comp, data in components.items():
if isinstance(data, dict):
sentiment = data.get('sentiment', 'N/A')
confidence = data.get('confidence', 0)
prompt += f"- {comp.replace('_', ' ').title()}: {str(sentiment).upper()} ({confidence:.2%})\n"
# Add identified risks from SEC filings
prompt += "\nKey Risk Factors from SEC Filings:\n"
key_risks = sec_insights.get('key_risks', [])
if isinstance(key_risks, list):
for risk in key_risks[:5]:
if isinstance(risk, dict):
prompt += f"- {risk.get('phrase', 'N/A')}\n"
elif isinstance(risk, str):
prompt += f"- {risk}\n"
# Add opportunities from SEC filings
prompt += "\nKey Opportunities from SEC Filings:\n"
key_opps = sec_insights.get('key_opportunities', [])
if isinstance(key_opps, list):
for opp in key_opps[:5]:
if isinstance(opp, dict):
prompt += f"- {opp.get('phrase', 'N/A')} ({opp.get('component', 'N/A')})\n"
elif isinstance(opp, str):
prompt += f"- {opp}\n"
# Add market intelligence
prompt += f"""
**Market Intelligence:**
- Price Trend: {market_insights.get('price_trend', 'N/A')}
- News Sentiment: {market_insights.get('news_sentiment', 'N/A')}
Technical Signals:
"""
tech_signals = market_insights.get('technical_signals', {})
if isinstance(tech_signals, dict):
for indicator, signal in tech_signals.items():
prompt += f"- {indicator.upper()}: {signal}\n"
# Add notable events
notable_events = market_insights.get('notable_events', [])
if isinstance(notable_events, list) and notable_events:
prompt += "\nNotable News Events:\n"
for event in notable_events[:3]:
if isinstance(event, str):
prompt += f"- {event}\n"
# Request structured output
prompt += """
**Your Task:**
Synthesize the above fundamental analysis (SEC filings) and market intelligence into a comprehensive investment recommendation.
IMPORTANT: Return ONLY valid JSON. No markdown, no code blocks, no extra text. Use spaces instead of tabs or special characters.
{
"sentiment": "BULLISH or BEARISH or NEUTRAL",
"confidence": "HIGH or MEDIUM or LOW",
"risks": [
{"category": "Fundamental or Technical or Sentiment", "description": "single line description", "severity": "high or medium or low", "evidence": ["brief evidence point"]}
],
"opportunities": ["opportunity 1", "opportunity 2"],
"action": "STRONG BUY or BUY or HOLD or SELL or AVOID - brief rationale",
"reasoning": "2-3 paragraph analysis with no special formatting or line breaks within paragraphs"
}
"""
# Call Gemini with JSON response mode
logger.info(f"Calling Gemini 2.0 Flash for {request.ticker} synthesis...")
# Configure for JSON output
generation_config = {
"response_mime_type": "application/json"
}
response = self.model.generate_content(
prompt,
generation_config=generation_config
)
# Parse response
import json
import re
response_text = response.text.strip()
# Log raw response for debugging
logger.debug(f"Gemini raw response (first 500 chars): {response_text[:500]}")
# Extract JSON from markdown code blocks if present
if "```json" in response_text:
response_text = response_text.split("```json")[1].split("```")[0].strip()
elif "```" in response_text:
response_text = response_text.split("```")[1].split("```")[0].strip()
# Simple approach: Remove ALL newlines and tabs from JSON
# JSON doesn't need pretty-printing to be valid
response_text = response_text.replace('\n', ' ').replace('\r', ' ').replace('\t', ' ')
# Replace multiple spaces with single space
response_text = re.sub(r'\s+', ' ', response_text)
# Remove non-printable control characters (but we already removed \n, \r, \t)
response_text = ''.join(
char if char.isprintable() or char == ' ' else ' '
for char in response_text
)
try:
result = json.loads(response_text)
except json.JSONDecodeError as e:
# Log the problematic JSON for debugging
logger.error(f"JSON decode error: {e}")
logger.error(f"Problematic JSON (first 1000 chars): {response_text[:1000]}")
raise
# Convert to InvestmentRecommendation
risks = [
RiskFactor(
category=risk.get("category", "Unknown"),
description=risk.get("description", ""),
severity=risk.get("severity", "medium"),
evidence=risk.get("evidence", []),
)
for risk in result.get("risks", [])
]
return InvestmentRecommendation(
ticker=request.ticker,
sentiment=result.get("sentiment", "NEUTRAL"),
confidence=result.get("confidence", "MEDIUM"),
key_risks=risks,
key_opportunities=result.get("opportunities", []),
recommended_action=result.get("action", "HOLD - Insufficient data"),
reasoning=result.get("reasoning", "No reasoning provided"),
)
def _determine_sentiment(
self, sec_insights: Dict, market_insights: Dict
) -> str:
"""Determine overall sentiment (BULLISH/BEARISH/NEUTRAL)"""
# SEC sentiment
sec_sentiment = sec_insights.get("overall_sentiment", "neutral")
# Market sentiment
price_trend = market_insights.get("price_trend", "NEUTRAL")
news_sentiment = market_insights.get("news_sentiment", "NEUTRAL")
# Weighted decision
bullish_signals = 0
bearish_signals = 0
# SEC analysis (higher weight)
if sec_sentiment == "positive":
bullish_signals += 2
elif sec_sentiment == "negative":
bearish_signals += 2
# Price trend
if price_trend == "BULLISH":
bullish_signals += 1
elif price_trend == "BEARISH":
bearish_signals += 1
# News sentiment
if news_sentiment == "POSITIVE":
bullish_signals += 1
elif news_sentiment == "NEGATIVE":
bearish_signals += 1
# Determine overall
if bullish_signals > bearish_signals + 1:
return "BULLISH"
elif bearish_signals > bullish_signals + 1:
return "BEARISH"
else:
return "NEUTRAL"
def _determine_confidence(
self, sec_insights: Dict, market_insights: Dict
) -> str:
"""Determine confidence level (HIGH/MEDIUM/LOW)"""
sec_confidence = sec_insights.get("confidence", 0)
has_market_data = bool(market_insights.get("technical_signals"))
has_news = bool(market_insights.get("notable_events"))
# High confidence if SEC is confident and market confirms
if sec_confidence > 0.7 and has_market_data and has_news:
return "HIGH"
# Low confidence if missing data or conflicting signals
elif sec_confidence < 0.5 or not has_market_data:
return "LOW"
else:
return "MEDIUM"
def _identify_risks(
self, sec_insights: Dict, market_insights: Dict
) -> List[RiskFactor]:
"""Identify key risk factors"""
risks = []
# SEC-identified risks
sec_risks = sec_insights.get("key_risks", [])
for risk in sec_risks[:3]: # Top 3
risks.append(
RiskFactor(
category="Fundamental",
description=risk.get("phrase", "Unknown risk"),
severity="high" if risk.get("importance", 0) > 0.5 else "medium",
evidence=["SEC filing analysis"],
)
)
# Market risks
tech_signals = market_insights.get("technical_signals", {})
if tech_signals.get("rsi") == "OVERBOUGHT":
risks.append(
RiskFactor(
category="Technical",
description="Stock appears overbought (RSI > 70)",
severity="medium",
evidence=["Technical indicators"],
)
)
# News risks
if market_insights.get("news_sentiment") == "NEGATIVE":
risks.append(
RiskFactor(
category="Sentiment",
description="Negative news sentiment detected",
severity="medium",
evidence=market_insights.get("notable_events", [])[:2],
)
)
return risks
def _identify_opportunities(
self, sec_insights: Dict, market_insights: Dict
) -> List[str]:
"""Identify key opportunities"""
opportunities = []
# SEC opportunities
sec_opps = sec_insights.get("key_opportunities", [])
for opp in sec_opps[:3]:
opportunities.append(f"{opp.get('phrase')} ({opp.get('component')})")
# Technical opportunities
tech_signals = market_insights.get("technical_signals", {})
if tech_signals.get("rsi") == "OVERSOLD":
opportunities.append("Potentially oversold - technical bounce opportunity")
return opportunities
def _generate_action(
self, sentiment: str, confidence: str, risks: List[RiskFactor]
) -> str:
"""Generate recommended action"""
high_severity_risks = sum(1 for r in risks if r.severity == "high")
if sentiment == "BULLISH":
if confidence == "HIGH" and high_severity_risks == 0:
return "STRONG BUY - High conviction opportunity"
elif confidence == "MEDIUM" or high_severity_risks <= 1:
return "BUY - Favorable risk/reward, size position appropriately"
else:
return "HOLD - Monitor for risk reduction before entering"
elif sentiment == "BEARISH":
if confidence == "HIGH":
return "SELL/AVOID - High confidence bearish outlook"
else:
return "HOLD - Bearish signals but insufficient confidence to sell"
else: # NEUTRAL
return "HOLD - Insufficient conviction either way, await clearer signals"
def _build_reasoning(
self,
ticker: str,
sentiment: str,
sec_insights: Dict,
market_insights: Dict,
risks: List[RiskFactor],
opportunities: List[str],
) -> str:
"""Build detailed reasoning for recommendation"""
reasoning = f"Analysis of {ticker}:\n\n"
# SEC analysis
reasoning += "Fundamental Analysis (SEC Filings):\n"
reasoning += f"- Overall sentiment: {sec_insights.get('overall_sentiment', 'N/A').upper()}\n"
reasoning += f"- Confidence: {sec_insights.get('confidence', 0):.2%}\n"
comp_sentiments = sec_insights.get("components", {})
if comp_sentiments:
reasoning += "- Component analysis:\n"
for comp, data in comp_sentiments.items():
reasoning += f" • {comp.replace('_', ' ').title()}: {data.get('sentiment', 'N/A').upper()}\n"
# Market analysis
reasoning += "\nMarket Analysis:\n"
reasoning += f"- Price trend: {market_insights.get('price_trend', 'N/A')}\n"
reasoning += f"- News sentiment: {market_insights.get('news_sentiment', 'N/A')}\n"
tech_signals = market_insights.get("technical_signals", {})
if tech_signals:
reasoning += "- Technical signals:\n"
for indicator, signal in tech_signals.items():
reasoning += f" • {indicator.upper()}: {signal}\n"
# Synthesis
reasoning += f"\nSynthesis: {sentiment} outlook"
if risks:
reasoning += f"\nKey risks identified: {len(risks)}"
if opportunities:
reasoning += f"\nKey opportunities: {', '.join(opportunities[:2])}"
return reasoning
def _create_summary(self, recommendation: InvestmentRecommendation) -> str:
"""Create concise summary"""
summary = f"""
Investment Recommendation for {recommendation.ticker}
Sentiment: {recommendation.sentiment}
Confidence: {recommendation.confidence}
Key Risks:
{chr(10).join(f'- {risk.description}' for risk in recommendation.key_risks[:3])}
Recommended Action: {recommendation.recommended_action}
"""
return summary.strip()