First_agent_template / analyze_corporate_finance.py
AtmanLi's picture
Rename app.py to analyze_corporate_finance.py
b5918f8 verified
raw
history blame
17.1 kB
from smolagents import CodeAgent, DuckDuckGoSearchTool, HfApiModel, load_tool, tool
import datetime
import requests
import pytz
import yaml
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
import io
import base64
from enum import Enum
from tools.final_answer import FinalAnswerTool
from Gradio_UI import GradioUI
# 基础工具定义
@tool
def my_custom_tool(arg1: str, arg2: int) -> str:
"""A tool that does nothing yet
Args:
arg1: the first argument
arg2: the second argument
"""
return "What magic will you build ?"
@tool
def get_current_time_in_timezone(timezone: str) -> str:
"""A tool that fetches the current local time in a specified timezone.
Args:
timezone: A string representing a valid timezone (e.g., 'America/New_York').
"""
try:
tz = pytz.timezone(timezone)
local_time = datetime.datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S")
return f"The current local time in {timezone} is: {local_time}"
except Exception as e:
return f"Error fetching time for timezone '{timezone}': {str(e)}"
# 公司收支数据分析工具
class AnalysisType(Enum):
OVERVIEW = "overview"
TREND = "trend"
CATEGORY = "category"
BUDGET = "budget"
CASH_FLOW = "cash_flow"
DEPARTMENT = "department"
@tool
def analyze_corporate_finance(data_source: str,
analysis_type: str = "overview",
period: Optional[str] = None,
department: Optional[str] = None,
export_format: str = "text") -> str:
"""
公司收支数据分析工具 - 专业财务分析系统
Args:
data_source: 数据源路径(CSV/Excel)或JSON格式的财务数据
analysis_type: 分析类型 - overview, trend, category, budget, cash_flow, department
period: 时间周期筛选,如 '2024-Q1:2024-Q4' 或 'last_12_months'
department: 部门筛选,如 '销售部', '技术部'
export_format: 输出格式 - text, detailed, visual
"""
try:
# 加载和预处理数据
df = load_financial_data(data_source)
df = preprocess_financial_data(df)
# 应用筛选条件
df = apply_filters(df, period, department)
if len(df) == 0:
return "没有找到符合条件的数据,请检查筛选条件"
# 执行分析
if analysis_type == AnalysisType.OVERVIEW.value:
result = generate_financial_overview(df)
elif analysis_type == AnalysisType.TREND.value:
result = analyze_financial_trends(df)
elif analysis_type == AnalysisType.CATEGORY.value:
result = analyze_category_performance(df)
elif analysis_type == AnalysisType.BUDGET.value:
result = analyze_budget_variance(df)
elif analysis_type == AnalysisType.CASH_FLOW.value:
result = analyze_cash_flow(df)
elif analysis_type == AnalysisType.DEPARTMENT.value:
result = analyze_department_performance(df)
else:
return "不支持的分析类型,请选择: overview, trend, category, budget, cash_flow, department"
# 根据输出格式调整结果
if export_format == "visual":
chart_url = generate_financial_charts(df, analysis_type)
return f"{result}\n\n![财务分析图表]({chart_url})"
elif export_format == "detailed":
return generate_detailed_report(df, analysis_type)
else:
return result
except Exception as e:
return f"财务分析过程中出现错误: {str(e)}"
def load_financial_data(data_source: str) -> pd.DataFrame:
"""加载财务数据,支持多种格式"""
try:
if data_source.endswith('.csv'):
return pd.read_csv(data_source)
elif data_source.endswith(('.xlsx', '.xls')):
return pd.read_excel(data_source)
else:
# 尝试解析为JSON
return pd.read_json(io.StringIO(data_source))
except Exception as e:
raise Exception(f"数据加载失败: {str(e)}")
def preprocess_financial_data(df: pd.DataFrame) -> pd.DataFrame:
"""财务数据清洗和预处理"""
# 基本列名标准化
column_mapping = {
'日期': 'date', '时间': 'date', '交易日期': 'date',
'金额': 'amount', '收支金额': 'amount', '交易金额': 'amount',
'类型': 'type', '收支类型': 'type', '交易类型': 'type',
'类别': 'category', '分类': 'category', '收支类别': 'category',
'部门': 'department', '所属部门': 'department'
}
df.columns = [column_mapping.get(col, col) for col in df.columns]
# 确保必要列存在
required_columns = ['date', 'amount', 'type']
for col in required_columns:
if col not in df.columns:
raise Exception(f"缺少必要列: {col}")
# 日期处理
df['date'] = pd.to_datetime(df['date'], errors='coerce')
df = df.dropna(subset=['date'])
# 数值处理
df['amount'] = pd.to_numeric(df['amount'], errors='coerce')
df['amount'] = df['amount'].fillna(0)
# 分类字段处理
if 'category' not in df.columns:
df['category'] = '未分类'
else:
df['category'] = df['category'].fillna('未分类')
if 'department' not in df.columns:
df['department'] = '通用部门'
else:
df['department'] = df['department'].fillna('通用部门')
# 添加时间维度
df['year'] = df['date'].dt.year
df['month'] = df['date'].dt.month
df['quarter'] = df['date'].dt.quarter
df['year_month'] = df['date'].dt.to_period('M')
df['day_of_week'] = df['date'].dt.day_name()
return df
def apply_filters(df: pd.DataFrame, period: Optional[str],
department: Optional[str]) -> pd.DataFrame:
"""应用时间和部门筛选"""
if period:
if period == 'last_12_months':
cutoff_date = datetime.now() - timedelta(days=365)
df = df[df['date'] >= cutoff_date]
elif ':' in period:
start_str, end_str = period.split(':')
start_date = pd.to_datetime(start_str)
end_date = pd.to_datetime(end_str)
df = df[(df['date'] >= start_date) & (df['date'] <= end_date)]
if department:
df = df[df['department'] == department]
return df
def generate_financial_overview(df: pd.DataFrame) -> str:
"""生成财务概览报告"""
total_income = df[df['type'] == '收入']['amount'].sum()
total_expense = df[df['type'] == '支出']['amount'].sum()
net_profit = total_income - total_expense
# 基本统计
total_transactions = len(df)
avg_transaction_amount = df['amount'].mean()
# 时间范围
date_range = f"{df['date'].min().strftime('%Y-%m-%d')}{df['date'].max().strftime('%Y-%m-%d')}"
# 部门统计
dept_stats = df.groupby('department').agg({
'amount': ['sum', 'count']
}).round(2)
overview = f"""
🏢 **公司财务概览报告**
**分析时间范围:** {date_range}
📊 **核心财务指标:**
• 总收入: ¥{total_income:,.2f}
• 总支出: ¥{total_expense:,.2f}
• 净利润: ¥{net_profit:,.2f}
• 利润率: {net_profit/total_income*100 if total_income > 0 else 0:.1f}%
📈 **交易统计:**
• 总交易笔数: {total_transactions}
• 平均交易金额: ¥{avg_transaction_amount:,.2f}
• 收入交易: {len(df[df['type'] == '收入'])}
• 支出交易: {len(df[df['type'] == '支出'])}
👥 **部门贡献概览:**
{dept_stats.to_string()}
"""
return overview
def analyze_financial_trends(df: pd.DataFrame) -> str:
"""分析财务趋势"""
# 月度趋势分析
monthly_data = df.groupby('year_month').agg({
'amount': ['sum', 'count', 'mean']
}).round(2)
# 收入支出对比
monthly_breakdown = df.groupby(['year_month', 'type'])['amount'].sum().unstack(fill_value=0)
monthly_breakdown['净利润'] = monthly_breakdown.get('收入', 0) - monthly_breakdown.get('支出', 0)
trend_analysis = f"""
📈 **财务趋势分析报告**
📅 **月度趋势指标:**
{monthly_data.to_string()}
💰 **收入支出对比:**
{monthly_breakdown.to_string()}
🔍 **趋势洞察:**
• 月均交易额: ¥{monthly_data[('amount', 'sum')].mean():,.2f}
• 月均交易笔数: {monthly_data[('amount', 'count')].mean():.0f}
• 增长趋势: {'上升' if monthly_breakdown['净利润'].iloc[-1] > monthly_breakdown['净利润'].iloc[0] else '平稳'}
"""
return trend_analysis
def analyze_category_performance(df: pd.DataFrame) -> str:
"""分析收支类别绩效"""
category_stats = df.groupby(['category', 'type']).agg({
'amount': ['sum', 'count', 'mean']
}).round(2)
category_stats.columns = ['总金额', '交易笔数', '平均金额']
category_stats = category_stats.reset_index()
# Top类别分析
top_income = category_stats[category_stats['type'] == '收入'].nlargest(5, '总金额')
top_expense = category_stats[category_stats['type'] == '支出'].nlargest(5, '总金额')
category_analysis = f"""
🏷️ **收支类别绩效分析**
📊 **Top 5 收入类别:**
{top_income[['category', '总金额', '交易笔数']].to_string(index=False)}
📊 **Top 5 支出类别:**
{top_expense[['category', '总金额', '交易笔数']].to_string(index=False)}
💡 **效率指标:**
• 收入集中度: 前3类别占比 {top_income['总金额'].head(3).sum()/category_stats[category_stats['type']=='收入']['总金额'].sum()*100:.1f}%
• 支出集中度: 前3类别占比 {top_expense['总金额'].head(3).sum()/category_stats[category_stats['type']=='支出']['总金额'].sum()*100:.1f}%
"""
return category_analysis
def analyze_budget_variance(df: pd.DataFrame) -> str:
"""分析预算执行情况"""
# 这里使用模拟预算数据,实际应用中应该从数据库或配置文件中获取
budget_data = {
'销售收入': 1000000,
'技术服务': 500000,
'人力成本': 400000,
'营销费用': 200000,
'研发支出': 300000,
'行政管理': 150000
}
variance_report = "💰 **预算执行分析报告**\n\n"
actual_income = df[df['type'] == '收入'].groupby('category')['amount'].sum()
actual_expense = df[df['type'] == '支出'].groupby('category')['amount'].sum()
for category, budget in budget_data.items():
if category in actual_income.index:
actual = actual_income[category]
variance = (actual - budget) / budget * 100
status = "✅ 超额完成" if variance >= 0 else "⚠️ 未达预算"
variance_report += f"• {category}: 实际¥{actual:,.0f} / 预算¥{budget:,.0f} ({variance:+.1f}%) {status}\n"
elif category in actual_expense.index:
actual = actual_expense[category]
variance = (actual - budget) / budget * 100
status = "⚠️ 超预算" if variance > 0 else "✅ 预算内"
variance_report += f"• {category}: 实际¥{actual:,.0f} / 预算¥{budget:,.0f} ({variance:+.1f}%) {status}\n"
return variance_report
def analyze_cash_flow(df: pd.DataFrame) -> str:
"""分析现金流状况"""
monthly_cash_flow = df.groupby('year_month').apply(
lambda x: x[x['type'] == '收入']['amount'].sum() -
x[x['type'] == '支出']['amount'].sum()
)
cash_flow_analysis = f"""
💳 **现金流分析报告**
📊 **现金流指标:**
• 月均现金流: ¥{monthly_cash_flow.mean():,.2f}
• 现金流波动率: {monthly_cash_flow.std():.2f}
• 正现金流月份: {(monthly_cash_flow > 0).sum()}个月
• 负现金流月份: {(monthly_cash_flow < 0).sum()}个月
🔔 **现金流健康状况:**
{generate_cash_flow_health_assessment(monthly_cash_flow)}
"""
return cash_flow_analysis
def analyze_department_performance(df: pd.DataFrame) -> str:
"""部门绩效分析"""
dept_performance = df.groupby('department').agg({
'amount': ['sum', 'count', 'mean']
}).round(2)
dept_performance.columns = ['总金额', '交易笔数', '平均金额']
# 部门贡献分析
dept_income = df[df['type'] == '收入'].groupby('department')['amount'].sum()
dept_expense = df[df['type'] == '支出'].groupby('department')['amount'].sum()
dept_analysis = f"""
👥 **部门绩效分析报告**
📊 **各部门财务表现:**
{dept_performance.to_string()}
🎯 **部门贡献分析:**
• 收入Top部门: {dept_income.nlargest(3).to_string()}
• 支出Top部门: {dept_expense.nlargest(3).to_string()}
• 净收益最佳部门: {(dept_income - dept_expense).nlargest(3).to_string()}
"""
return dept_analysis
def generate_financial_charts(df: pd.DataFrame, analysis_type: str) -> str:
"""生成财务分析图表"""
plt.figure(figsize=(12, 8))
if analysis_type == AnalysisType.OVERVIEW.value:
# 收支趋势图
monthly_data = df.groupby(['year_month', 'type'])['amount'].sum().unstack()
monthly_data.plot(kind='line', ax=plt.gca())
plt.title('月度收支趋势')
plt.xticks(rotation=45)
elif analysis_type == AnalysisType.CATEGORY.value:
# 类别分布图
category_totals = df.groupby('category')['amount'].sum().nlargest(8)
plt.pie(category_totals.values, labels=category_totals.index, autopct='%1.1f%%')
plt.title('消费类别分布')
plt.tight_layout()
# 转换为base64编码图像
img_buffer = io.BytesIO()
plt.savefig(img_buffer, format='png', bbox_inches='tight')
img_buffer.seek(0)
img_str = base64.b64encode(img_buffer.read()).decode()
return f"data:image/png;base64,{img_str}"
def generate_detailed_report(df: pd.DataFrame, analysis_type: str) -> str:
"""生成详细分析报告"""
base_report = ""
if analysis_type == AnalysisType.OVERVIEW.value:
base_report = generate_financial_overview(df)
# 其他类型的详细报告...
detailed_stats = f"""
📋 **详细统计数据:**
📈 **描述性统计:**
{df['amount'].describe().to_string()}
📅 **时间范围统计:**
• 最早交易: {df['date'].min().strftime('%Y-%m-%d')}
• 最晚交易: {df['date'].max().strftime('%Y-%m-%d')}
• 分析天数: {(df['date'].max() - df['date'].min()).days}
"""
return base_report + detailed_stats
def generate_cash_flow_health_assessment(monthly_cash_flow: pd.Series) -> str:
"""生成现金流健康评估"""
negative_months = (monthly_cash_flow < 0).sum()
total_months = len(monthly_cash_flow)
if negative_months == 0:
return "🟢 优秀: 所有月份均为正现金流,财务状况非常健康"
elif negative_months <= total_months * 0.25:
return "🟡 良好: 少数月份出现负现金流,整体状况良好"
elif negative_months <= total_months * 0.5:
return "🟠 一般: 较多月份出现负现金流,需要关注资金管理"
else:
return "🔴 风险: 大部分月份为负现金流,存在资金风险"
# 快速财务检查工具
@tool
def quick_financial_check(file_path: str) -> str:
"""快速财务健康检查工具"""
try:
df = load_financial_data(file_path)
df = preprocess_financial_data(df)
overview = generate_financial_overview(df)
cash_flow = analyze_cash_flow(df)
return f"{overview}\n\n{cash_flow}"
except Exception as e:
return f"快速检查失败: {str(e)}"
# 初始化其他工具
final_answer = FinalAnswerTool()
# 配置模型
model = HfApiModel(
max_tokens=2096,
temperature=0.5,
model_id='Qwen/Qwen2.5-Coder-32B-Instruct',
custom_role_conversions=None,
)
# 加载其他工具
image_generation_tool = load_tool("agents-course/text-to-image", trust_remote_code=True)
# 加载提示模板
with open("prompts.yaml", 'r') as stream:
prompt_templates = yaml.safe_load(stream)
# 创建代理并集成所有工具
agent = CodeAgent(
model=model,
tools=[
final_answer,
analyze_corporate_finance, # 公司财务分析工具
quick_financial_check, # 快速检查工具
image_generation_tool,
get_current_time_in_timezone,
my_custom_tool
],
max_steps=12, # 增加步骤限制以支持复杂分析
verbosity_level=1,
grammar=None,
planning_interval=None,
name="Corporate Finance Analyst Pro",
description="专业的企业财务分析AI助手,支持多维度收支数据分析和可视化",
prompt_templates=prompt_templates
)
# 启动Gradio界面
GradioUI(agent).launch()